Dagstd
Dagstd
Dagstd is a Python package containing a set of helper modules for use with the Dagster data orchestration tool.
Dagster is a great tool, but there are occasions where you just need to pass in a simple integer or string as input to a Dagster op, but in Dagster, inputs to ops can only be outputs of other ops. This results in a lot of boilerplate functions being written that just return a formatted string or even just an integer. This is why Dagstd was created.
Features
Simple ops for common numbers
Constant value ops
Helper ops for mathematical and string operations
Ops for retrieving environment variables
Sphinx autodoc support for Dagster ops
Usage
Here’s an example of a pure-Dagster graph that downloads a daily zip file and
extracts a known file name. Note: the download_large_file
op has been
omitted for brevity.
import zipfile
from datetime import datetime
from dagster import op, job
@op
def get_todays_date() -> str:
return datetime.today().strftime()
@op
def five() -> int:
return 5
@op
def get_download_file_url(date: str) -> str:
return f'https://example.com/{date}.csv'
@op
def get_nth_file_name(n: int) -> str:
return f'file_{n:02}.txt'
@op
def extract_file_from_zip(context, zip_path: str, file_name: str) -> str:
with zipfile.ZipFile(zip_path) as zip_file:
with(f'/tmp/{file_name}', 'wb') as f:
f.write(zip_file.read(file_name))
context.log.info(f'Extracted {file_name} from {zip_path}')
return f'/tmp/{file_name}'
@job
def process_data():
date = get_todays_date()
url = get_download_file_url(date)
zip_path = download_large_file(url)
file_name = get_nth_file_name(five())
file_path = extract_file_from_zip(zip_path, file_name)
And here’s the same graph, but with Dagstd ops.
import zipfile
from datetime import datetime
from dagster import op, job
from dagstd.constants import Constant, Five
from dagstd.operations import fmt
@op
def get_todays_date_string() -> str:
return datetime.today().strftime("%Y-%m-%d")
@op
def extract_file_from_zip(context, zip_path: str, file_name: str) -> str:
with zipfile.ZipFile(zip_path) as zip_file:
with(f'/tmp/{file_name}', 'wb') as f:
f.write(zip_file.read(file_name))
context.log.info(f'Extracted {file_name} from {zip_path}')
return f'/tmp/{file_name}'
@job
def process_data():
date = get_todays_date_string()
url = fmt(Constant('https://example.com/{}.csv'), [date])
zip_path = download_large_file(url)
file_name = fmt(Constant('file_{}.txt'), [Five()])
file_path = extract_file_from_zip(zip_path, file_name)
This was just a small example, but it serves to show how much boilerplate can be avoided when using Dagstd.
Sphinx Autodoc Plugin
Dagstd includes a Sphinx autodoc plugin that can be used to generate
documentation for Dagster ops. To use the autodoc plugin, add the following
to your conf.py
file:
extensions = [ ... 'dagstd.sphinx.parser', ]
By default, this will prefix all op documentation with (op)
. To change
this, add the following to your conf.py
file:
dagstd_op_prefix = 'My Op'
Documentation
Documentation can be found at https://dagstd.readthedocs.io/en/latest/readme.html.
Installation
Install Dagstd with pip:
pip install dagstd
Dependencies
Contribute
I’m always looking for more ideas to add to Dagstd. If you have an idea, please open an issue or pull request, or message me on GitHub.
Issue Tracker: https://github.com/isaacharrisholt/dagstd/issues
Source Code: https://github.com/isaacharrisholt/dagstd
Support
If you are having issues, please let me know.
License
The project is licensed under the GNU GPLv3 license.
dagstd package
Subpackages
dagstd.constants package
Submodules
dagstd.constants.constant module
constant.py contains a Constant function that acts as an op that returns whatever value is passed to it.
- dagstd.constants.constant.Constant(value: Any, suffix: Optional[str] = None, make_unique: bool = True) Any [source]
Acts as an op that returns whatever value is passed to it on creation.
- Parameters
value (Any) – The value to return.
suffix (str, default=None) – A suffix to append to the op name.
make_unique (bool, default=True) – If True, the op name will be suffixed with a UUID. Is overridden if the suffix argument is provided.
The name of the op is the value passed to it plus a unique ID to allow
having multiple constants with the same value. Alternatively, you can
pass a suffix to the Constant function to be used instead of the unique ID.
dagstd.constants.numbers module
numbers.py contains ops that return common numbers. These ops are capitalised to make them seem more like classes, which helps with readability.
Module contents
dagstd.operations package
Submodules
dagstd.operations.maths module
maths.py contains ops that perform common mathematical operations.
dagstd.operations.strings module
strings.py contains helper ops for working with strings.
Module contents
dagstd.sphinx package
Submodules
dagstd.sphinx.parser module
- class dagstd.sphinx.parser.OpDirective(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine)[source]
Bases:
PyFunction
Sphinx op directive.
- class dagstd.sphinx.parser.OpDocumenter(directive: DocumenterBridge, name: str, indent: str = '')[source]
Bases:
FunctionDocumenter
Document op definitions.
- classmethod can_document_member(member, membername, isattr, parent)[source]
Called to see if a member can be documented by this Documenter.
- document_members(all_members=False)[source]
Generate reST for member documentation.
If all_members is True, document all members, else those given by self.options.members.
- format_args(**kwargs: Any)[source]
Format the argument signature of self.object.
Should return None if the object does not have a signature.
- member_order = 11
order if autodoc_member_order is set to ‘groupwise’
- objtype = 'op'
name by which the directive is called (auto…) and the default generated directive name
Module contents
Submodules
dagstd.env module
env.py contains ops for working with environment variables.
- (op)dagstd.env.or_default(key: str, default: str) str [source]
Returns the value of the environment variable with the given key, or the given default if the variable is not set.
- (op)dagstd.env.or_empty(key: str) str [source]
Returns the value of the environment variable with the given key, or an empty string if the variable is not set.
dagstd.utils module
utils.py contains useful helper ops that are not specific to any part of the Dagstd library.
- (op)dagstd.utils.no_op(value: Any) Any [source]
Does nothing. Allows unlinked dependencies for graphs.
Examples
In this example, even though graph_2 has no data dependencies on graph_1, it will still wait for graph_1 to complete before starting due to the use of the no_op operation.
This is useful when you have graphs that need to activate in sequence, but the earlier graphs don’t pass data. For example, you might run a data sync with Airbyte, and then run a dbt project to transform that data in your warehouse.
from dagster import graph, job, In from dagstd.constants import Constant, One, Two from dagstd.operations import add, fmt from dagstd.utils import no_op @graph def graph_1(): name = Constant('Isaac') age = Constant(20) return fmt(Constant('{} is {} years old'), name, age) @graph(ins={'arg_1': In(int), 'arg_2': In(int), 'deps': In(Any)}) def graph_2(arg_1, arg_2, deps): no_op(deps) return add([arg_1, arg_2]) @job def my_job(): string = graph_1() num = graph_2(One(), Two(), deps=string)