Celery is a longstanding open-source Python distributed task queue system, with support for a variety of queues (brokers) and result persistence strategies (backends).
The dagster-celery
executor uses Celery to satisfy three typical
requirements when running pipelines in production:
The dagster-celery executor compiles a pipeline definition and its associated configuration into a concrete execution plan, and then submits each execution step to the broker as a separate Celery task. The dagster-celery workers then pick up tasks from the queues to which they are subscribed, according to the priorities assigned to each task, and execute the steps to which the tasks correspond.
Let's construct a very parallel toy pipeline and then execute it using the Celery executor.
from dagster import ModeDefinition, default_executors, fs_io_manager, pipeline, solid
from dagster_celery import celery_executor
celery_mode_defs = [
ModeDefinition(
resource_defs={"io_manager": fs_io_manager},
executor_defs=default_executors + [celery_executor],
)
]
@solid
def not_much(_):
return
@pipeline(mode_defs=celery_mode_defs)
def parallel_pipeline():
for i in range(50):
not_much.alias("not_much_" + str(i))()
Save this pipeline as celery_pipeline.py
.
To run the Celery executor, you'll need a running broker like RabbitMQ. With Docker, this is as simple as:
docker run -p 5672:5672 rabbitmq:3.8.2
You will also need to run a Celery worker to execute tasks. From the same directory in which you saved the pipeline file (we'll explain why in a moment), run:
dagster-celery worker start -A dagster_celery.app
Now you can execute this pipeline with Celery. Again, from the same directory in which you saved the pipeline file, run:
dagit -f celery_pipeline.py
Now you can execute the parallel pipeline from Dagit with the following config:
execution:
celery:
In the quick start, we cheated in a couple of ways:
celery_pipeline.py
, so that our Dagster code would be available
both to Dagit and to the worker: i.e., the worker and Dagit can both
find the pipeline definition in the same file
(-f celery_pipeline.py
).In production, more configuration is required.
First, ensure that appropriate persistent run and event log storage,
e.g., PostgresRunStorage
and PostgresEventLogStorage
,
are configured on your Dagster instance, so that Dagit
and the workers can communicate information about the run and events.
You can do this by adding a block such as the following to your
dagster.yaml
(by default, Dagster will look for this file at
$DAGSTER_HOME/dagster.yaml
when the DAGSTER_HOME
environment
variable is set):
run_storage:
module: dagster_postgres.run_storage
class: PostgresRunStorage
config:
postgres_db:
username: { username }
password: { password }
hostname: { hostname }
db_name: { database }
port: { port }
event_log_storage:
module: dagster_postgres.event_log
class: PostgresEventLogStorage
config:
postgres_db:
username: { username }
password: { password }
hostname: { hostname }
db_name: { db_name }
port: { port }
schedule_storage:
module: dagster_postgres.schedule_storage
class: PostgresScheduleStorage
config:
postgres_db:
username: { username }
password: { password }
hostname: { hostname }
db_name: { db_name }
port: { port }
The same instance config must be present in Dagit's environment and in the workers' environments. If you haven't already, please read about the Dagster instance.
Second, because data is passed between solids potentially running in
different worker processes, which may be on different nodes, you must
use storage that is accessible from all nodes running Celery workers for
the pipeline runs you execute with the Celery executor -- for example,
an S3 or GCS bucket, or an NFS mount. An appropriate system storage --
such as s3_system_storage
or gcs_system_storage
-- must be available on a ModeDefinition
attached
to the pipeline.
Third, if you are using custom config for your pipeline runs -- for
instance, using a different Celery broker url or backend -- you must
ensure that your workers start up with this config. Make sure your
engine config is present in a YAML file accessible to the workers and
start them with the -y
parameter as follows:
dagster-celery worker start -y /path/to/celery_config.yaml
Finally, you will need to make sure that the Dagster code you want the
workers to execute is present in their environment, and that it is in
sync with the code present on the node running Dagit. The easiest way to
do this is typically to package your code into a Python module, and for
the workspace.yaml
you use to run Dagit to load from that module.
In the quick start, we accomplished this by starting Dagit with the -f
and -n
parameters, which told it which file to find our pipeline in
and under what name, and then made sure that we started our Celery
worker from the same point in the filesystem -- so that the pipeline
was available at the same place.
In the quick start above, we started our workers using the
dagster-celery
CLI, rather than by invoking Celery directly. This CLI
is intended as a convenience wrapper for the common case that shields
users from the full complexity of Celery configuration. Of course, it's
still possible to start Celery workers directly -- but please let us
know if you find yourself going down this path.
Run dagster-celery worker start
to start new workers,
dagster-celery worker list
to view running workers, and
dagster-celery worker terminate
to terminate workers. For all of these
commands, it's essential that you have your broker running.
If you are running your celery cluster with custom config (e.g., the
broker URL or backend), you should also pass a path to a YAML file
containing that config to the -y
parameter of all these invocations.
Otherwise, workers will not start up with the config you are expecting,
and the CLI may not be able to find running workers to list or terminate
them.
Although this system is deliberately designed to make the full range of Celery config available as needed, keep in mind that Celery exposes many knobs, many combinations of which are not compatible with each other. However, workloads also differ widely. If you are running many pipelines with very long-running or very short-running tasks, for instance, and you are already comfortable tuning Celery, you might find that changing some of the settings works better for your case.
There are several available tools for monitoring and debugging your queues and workers. First, of course, is Dagit, which will display event logs and the stdout/stderr from your pipeline executions. You can also observe the logs generated by your broker and by the worker processes.
To debug broker/queue level issues, you should use the monitoring tools provided by the broker you're running. RabbitMQ includes a monitoring API and has first class support for Prometheus & Grafana integration in production.
Celery also includes the excellent Flower tool for monitoring your Celery workers and queues. This can be very useful in understanding how workers interact with the queue.
At the moment, dagster-celery is tested using the RabbitMQ broker and the default RPC backend. At least one team is running dagster-celery using SQS as the broker, and we intend to expand the scope of broker/backend matrix testing, probably starting with the Postgres backend.