Cookbook

This part of the docs contains recipes for various things you might want to do using Dramatiq. Each section will be light on prose and code heavy, so if you have any questions about one of the recipes, open an issue on GitHub.

Callbacks

Dramatiq has built-in support for sending actors messages when other actors succeed or fail. The on_failure callback is called every time an actor fails, even if the message is going to be retried.

import dramatiq


@dramatiq.actor
def identity(x):
    return x


@dramatiq.actor
def print_result(message_data, result):
    print(f"The result of message {message_data['message_id']} was {result}.")

@dramatiq.actor
def print_error(message_data, exception_data):
    print(f"Message {message_data['message_id']} failed:")
    print(f"  * type: {exception_data['type']}")
    print(f"  * message: {exception_data['message']!r}")


if __name__ == "__main__":
    identity.send_with_options(
        args=(42,),
        on_failure=print_error,
        on_success=print_result,
    )

Composition

Dramatiq has built-in support for a couple high-level composition constructs. You can use these to combine generalized tasks that don’t know about one another into complex workflows.

In order to take advantage of group and pipeline result management or to wait for a group or pipeline to finish, you need to enable result storage and your actors need to store results. Check out the Results section for more information on result storage.

Groups

Groups run actors in parallel and let you gather their results or wait for all of them to finish. Assuming you have a computationally intensive actor called frobnicate, you can group multiple messages together as follows:

g = group([
    frobnicate.message(1, 2),
    frobnicate.message(2, 3),
    frobnicate.message(3, 4),
]).run()

This will enqueue 3 separate messages and, assuming there are enough resources available, execute them in parallel. You can then wait for the whole group to finish:

g.wait(timeout=10_000)  # 10s expressed in millis

Or you can iterate over the results:

for res in g.get_results(block=True, timeout=5_000):
    ...

Results are returned in the same order that the messages were added to the group.

Pipelines

Actors can be chained together using the pipeline function. For example, if you have an actor that gets the text contents of a website and one that counts the number of “words” in a piece of text:

@dramatiq.actor
def get_uri_contents(uri):
    return requests.get(uri).text


@dramatiq.actor
def count_words(uri, text):
    count = len(text.split(" "))
    print(f"There are {count} words at {uri}.")

You can chain them together like so:

uri = "http://example.com"
pipe = pipeline([
    get_uri_contents.message(uri),
    count_words.message(uri),
]).run()

Or you can use pipe notation to achieve the same thing:

pipe = get_uri_contents.message(uri) | count_words.message(uri)

In both cases, the result of running get_uri_contents(uri) is passed as the last positional argument to count_words. If you would like to avoid passing the result of an actor to the next one in line, set the pipe_ignore option to True when you create the “receiving” message:

(
    bust_caches.message() |
    prepare_codes.message_with_options(pipe_ignore=True) |
    launch_missiles.message()
)

Here, the result of bust_caches() will not be passed to prepare_codes(), but the result of prepare_codes() will be passed to launch_missiles(codes). To get the end result of a pipeline – that is, the result of the last actor in the pipeline – you can call get_result:

pipe.get_result(block=True, timeout=5_000)

To get the intermediate results of each step in the pipeline, you can call get_results:

for res in pipe.get_results(block=True):
    ...

Error Reporting

Reporting errors with Rollbar

Rollbar provides an easy-to-use Python client:

$ pip install rollbar

Save the following middleware to a module inside your project:

import dramatiq
import rollbar


class RollbarMiddleware(dramatiq.Middleware):
    def after_process_message(self, broker, message, *, result=None, exception=None):
        if exception is not None:
            rollbar.report_exc_info()

Finally, instantiate and add it to your broker:

rollbar.init(YOUR_ROLLBAR_KEY)
broker.add_middleware(path.to.RollbarMiddleware())

Reporting errors with Sentry

Use sentry-dramatiq.

Frameworks

API Star

The apistar_dramatiq library lets you use API Star dependency injection with your Dramatiq actors.

Django

Check out the django_dramatiq project if you want to use Dramatiq with Django. The django_dramatiq_example repo is an example app build with Django and Dramatiq.

Flask

The Flask-Dramatiq extension integrates Dramatiq with Flask. It includes support for configuration and the application factory pattern. Flask-Melodramatiq is very similar in scope, but tries to stay as close as possible to the “native” Dramatiq API.

Operations

Auto-discovering “tasks” modules with bash

Dramatiq doesn’t attempt to auto-discover tasks modules. Assuming you follow a convention where all your tasks modules are named tasks.py then you can discover them using bash:

#!/usr/bin/env bash

set -e

tasks_packages=$(find . -type d -name tasks | sed s':/:.:g' | sed s'/^..//' | xargs)
tasks_modules=$(find . -type f -name tasks.py | sed s':/:.:g' | sed s'/^..//' | sed s'/.py$//g' | xargs)
all_modules="$tasks_packages $tasks_modules"

echo "Discovered tasks modules:"
for module in $all_modules; do
  echo "  * ${module}"
done
echo

dramatiq-gevent $all_modules --watch . --watch-use-polling

Retrying connection errors on startup

Dramatiq does not retry connection errors that occur on worker startup. It does, however, return a specific exit code (3) when that happens. Using that, you can build a wrapper script around it if you need to retry with backoff when connection errors happen during startup (eg. in Docker):

#!/usr/bin/env bash

delay=1
while true; do
  dramatiq $@
  if [ $? -eq 3 ]; then
    echo "Connection error encountered on startup. Retrying in $delay second(s)..."
    sleep $delay
    delay=$((delay * 2))
  else
    exit $?
  fi
done

Binding Worker Groups to Queues

By default, Dramatiq workers consume all declared queues, but it’s common to want to bind worker groups to specific queues in order to have better control over throughput. For example, given the following actors:

@dramatiq.actor
def very_slow():
    ...


@dramatiq.actor(queue_name="ui-blocking")
def very_important():
    ...

You may want to run one group of workers that only processes messages on the default queue and another that only processes messages off of the ui-blocking queue. To do that, you have to pass each group the appropriate queue on the command line:

# Only consume the "default" queue
$ dramatiq an_app --queues default

# Only consume the "ui-blocking" queue
$ dramatiq an_app --queues ui-blocking

Messages sent to very_slow will always be delivered to those workers that consume the default queue and messages sent to very_important will always be delivered to the ones that consume the ui-blocking queue.

Rate Limiting

Rate limiting work

You can use Dramatiq’s RateLimiters to constrain actor concurrency.

import dramatiq
import time

from dramatiq.rate_limits import ConcurrentRateLimiter
from dramatiq.rate_limits.backends import RedisBackend

backend = RedisBackend()
DISTRIBUTED_MUTEX = ConcurrentRateLimiter(backend, "distributed-mutex", limit=1)


@dramatiq.actor
def one_at_a_time():
    with DISTRIBUTED_MUTEX.acquire():
        time.sleep(1)
        print("Done.")

Whenever two one_at_a_time actors run at the same time, one of them will be retried with exponential backoff. This works by raising an exception and relying on the built-in Retries middleware to do the work of re-enqueueing the task.

If you want rate limiters not to raise an exception when they can’t be acquired, you should pass raise_on_failure=False to acquire:

with DISTRIBUTED_MUTEX.acquire(raise_on_failure=False) as acquired:
    if not acquired:
        print("Lock could not be acquired.")
    else:
        print("Lock was acquired.")

Results

Storing message results

You can use Dramatiq’s result backends to store and retrieve message return values. To enable result storage, you need to instantiate and add the Results middleware to your broker.

import dramatiq

from dramatiq.brokers.rabbitmq import RabbitmqBroker
from dramatiq.results.backends import RedisBackend
from dramatiq.results import Results

result_backend = RedisBackend()
broker = RabbitmqBroker()
broker.add_middleware(Results(backend=result_backend))
dramatiq.set_broker(broker)


@dramatiq.actor(store_results=True)
def add(x, y):
    return x + y


if __name__ == "__main__":
    message = add.send(1, 2)
    print(message.get_result(block=True))

Getting a result raises ResultMissing when a result hasn’t been stored yet or if it has already expired (results expire after 10 minutes by default). When the block parameter is True, ResultTimeout is raised instead.

When a message fails, getting a result raises ResultFailure. When a message is skipped via SkipMessage, None is stored as the result.

Results expire, otherwise the result backend would eventually run out of space. The timeout for the results expiration is set on result_ttl argument of Results, given in milliseconds, and the default is 10 minutes.

# Results are valid only for one hour
Results(backend=result_backend, result_ttl=3600*1000)

The result expiration can be also set per an actor:

# Add result expires in 30 seconds
@dramatiq.actor(store_results=True, result_ttl=30*1000)
def add(x, y):
    return x + y

Scheduling

Scheduling messages

APScheduler is the recommended scheduler to use with Dramatiq:

import dramatiq
import sys

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime


@dramatiq.actor
def print_current_date():
    print(datetime.now())


if __name__ == "__main__":
    scheduler = BlockingScheduler()
    scheduler.add_job(
        print_current_date.send,
        CronTrigger.from_crontab("* * * * *"),
    )
    try:
        scheduler.start()
    except KeyboardInterrupt:
        scheduler.shutdown()

Aborting

Aborting Messages

The dramatiq-abort package provides a middleware that can be used to abort running actors by message id. Here’s how you might set it up:

import dramatiq
import dramatiq_abort.backends
from dramatiq_abort import Abortable, abort

abortable = Abortable(
    backend=dramatiq_abort.backends.RedisBackend()
)
dramatiq.get_broker().add_middleware(abortable)

@dramatiq.actor
def a_long_running_task():
    ...

message = a_long_running_task.send()
abort(message.message_id)