Dagster provides Composite Solids, which are a unit of abstraction for composing a solid from other solids.
Name | Description |
---|---|
@composite_solid | The decorator used to define a composite solid. |
Solids are linked together by defining the dependencies between their inputs and outputs. Defining dependencies is usually done at the Pipeline level.
Composite solids also let you define solid dependencies to form an entirely new solid. This is useful for:
Refactoring a DAG of solids using composites is a very similar experience to refactoring code with functions. Defining a composite solid is similar to defining a pipeline, but composite solids can also provide mapping information to control how data and configuration enters and exists its inner graph of solids.
To define a composite solid, use the @composite_solid
decorator.
We use function calls within the decorated function body to indicate the dependency structure between the solids making up the composite solid.
In this example, the add_one
solid depends on the return_one
solid's output. Because this data dependency exists, the return_one
solid executes after add_one
runs successfully and emits the required output.
@solid(config_schema={"hi": str}, input_defs=[InputDefinition("number", int)])
def add_one(context, number):
return number + 1
@solid(input_defs=[InputDefinition("number", int)])
def multiply_by_three(context, number):
return number * 3
@composite_solid(input_defs=[InputDefinition("number", int)])
def add_one_times_three_solid(number):
return multiply_by_three(add_one(number))
By default, the config schemas for each solid in a composite solid is hoisted up to the composite solid itself, under the solid
key.
In this example, you have two solids that both take config and are wrapped by a composite solid.
@solid(config_schema={"n": int}, input_defs=[InputDefinition("number", int)])
def add_n(context, number):
return number + context.solid_config["n"]
@solid(config_schema={"m": int}, input_defs=[InputDefinition("number", int)])
def multiply_by_m(context, number):
return number * context.solid_config["m"]
@composite_solid(input_defs=[InputDefinition("number", int)])
def add_n_times_m_solid(number):
return multiply_by_m(add_n(number))
To run a pipeline with this composite solid, you will need to specify the config for both add_n
and multiply_by_m
through the composite solid:
solids:
add_n_times_m_solid:
inputs:
number: 0
solids:
add_n:
config:
n: 3
multiply_by_m:
config:
m: 2
Composite solids can also define a config schema. When a composite solid defines a config schema, it must also define a config_mapping_fn
to map the composite solids config to the wrapped solids' config.
def config_mapping_fn(config):
x = config["x"]
return {"add_n": {"config": {"n": x}}, "multiply_by_m": {"config": {"m": x}}}
@composite_solid(
config_fn=config_mapping_fn,
config_schema={"x": int},
input_defs=[InputDefinition("number", int)],
)
def add_x_multiply_by_x(number):
return multiply_by_m(add_n(number))
In this example, the composite solid has only one field in the config schema: x
. The config mapping function takes config provided to the composite solid and maps it to the wrapped solids.