Warning: This document is for an old version of Dramatiq. The latest version is v1.4.0.


This part of the docs contains recipes for various things you might want to do using Dramatiq. Each section will be light on prose and code heavy, so if you have any questions about one of the recipes, open an issue on GitHub.


Dramatiq has built-in support for sending actors messages when other actors succeed or fail. The on_failure callback is called every time an actor fails, even if the message is going to be retried.

import dramatiq

def identity(x):
  return x

def print_result(message_data, result):
  print(f"The result of message {message_data['message_id']} was {result}.")

def print_error(message_data, exception_data):
  print(f"Message {message_data['message_id']} failed:")
  print(f"  * type: {exception_data['type']}")
  print(f"  * message: {exception_data['message']!r}")

if __name__ == "__main__":


Dramatiq has built-in support for a couple high-level composition constructs. You can use these to combine generalized tasks that don’t know about one another into complex workflows.

In order to take advantage of group and pipeline result management, you need to enable result storage and your actors need to store results. Check out the Results section for more information on result storage.


Groups run actors in parallel and let you gather their results or wait for all of them to finish. Assuming you have a computationally intensive actor called frobnicate, you can group multiple messages together as follows:

g = group([
  frobnicate.message(1, 2),
  frobnicate.message(2, 3),
  frobnicate.message(3, 4),

This will enqueue 3 separate messages and, assuming there are enough resources available, execute them in parallel. You can then wait for the whole group to finish:

g.wait(timeout=10_000)  # 10s expressed in millis

Or you can iterate over the results:

for res in g.get_results(block=True, timeout=5_000):

Results are returned in the same order that the messages were added to the group.


Actors can be chained together using the pipeline function. For example, if you have an actor that gets the text contents of a website and one that counts the number of “words” in a piece of text:

def get_uri_contents(uri):
  return requests.get(uri).text

def count_words(uri, text):
  count = len(text.split(" "))
  print(f"There are {count} words at {uri}.")

You can chain them together like so:

uri = "http://example.com"
pipe = pipeline([

Or you can use pipe notation to achieve the same thing:

pipe = get_uri_contents.message(uri) | count_words.message(uri)

In both cases, the result of running get_uri_contents(uri) is passed as the last positional argument to count_words. If you would like to avoid passing the result of an actor to the next one in line, set the pipe_ignore option to True when you create the “receiving” message:

  bust_caches.message() |
  prepare_codes.message_with_options(pipe_ignore=True) |

Here, the result of bust_caches() will not be passed to prepare_codes(), but the result of prepare_codes() will be passed to launch_missiles(codes). To get the end result of a pipeline – that is, the result of the last actor in the pipeline – you can call get_result:

pipe.get_result(block=True, timeout=5_000)

To get the intermediate results of each step in the pipeline, you can call get_results:

for res in pipe.get_results(block=True):

Error Reporting

Reporting errors with Rollbar

Rollbar provides an easy-to-use Python client. Add it to your project with pipenv:

$ pipenv install rollbar

Save the following middleware to a module inside your project:

import dramatiq
import rollbar

class RollbarMiddleware(dramatiq.Middleware):
  def after_process_message(self, broker, message, *, result=None, exception=None):
    if exception is not None:

Finally, instantiate and add it to your broker:


Reporting errors with Sentry

Install Sentry’s raven client with pipenv:

$ pipenv install raven

Save the following middleware to a module inside your project:

import dramatiq

class SentryMiddleware(dramatiq.Middleware):
  def __init__(self, raven_client):
    self.raven_client = raven_client

  def after_process_message(self, broker, message, *, result=None, exception=None):
    if exception is not None:

Finally, instantiate and add it to your broker:

from raven import Client

raven_client = Client(YOUR_DSN)


API Star

The apistar_dramatiq library lets you use API Star dependency injection with your Dramatiq actors.


Check out the django_dramatiq project if you want to use Dramatiq with Django. The django_dramatiq_example repo is an example app build with Django and Dramatiq.


The flask_dramatiq_example repo is an example app built with Flask and Dramatiq.


Auto-discovering “tasks” modules with bash

Dramatiq doesn’t attempt to auto-discover tasks modules. Assuming you follow a convention where all your tasks modules are named tasks.py then you can discover them using bash:

#!/usr/bin/env bash

set -e

tasks_packages=$(find . -type d -name tasks | sed s':/:.:g' | sed s'/^..//' | xargs)
tasks_modules=$(find . -type f -name tasks.py | sed s':/:.:g' | sed s'/^..//' | sed s'/.py$//g' | xargs)
all_modules="$tasks_packages $tasks_modules"

echo "Discovered tasks modules:"
for module in $all_modules; do
    echo "  * ${module}"

pipenv run dramatiq-gevent $all_modules --watch . --watch-use-polling

Retrying connection errors on startup

Dramatiq does not retry connection errors that occur on worker startup. It does, however, return a specific exit code (3) when that happens. Using that, you can build a wrapper script around it if you need to retry with backoff when connection errors happen during startup (eg. in Docker):

#!/usr/bin/env bash

while true; do
  dramatiq $@
  if [ $? -eq 3 ]; then
    echo "Connection error encountered on startup. Retrying in $delay second(s)..."
    sleep $delay
    delay=$((delay * 2))
    exit $?

Rate Limiting

Rate limiting work

You can use Dramatiq’s RateLimiters to constrain actor concurrency.

import dramatiq
import time

from dramatiq.rate_limits import ConcurrentRateLimiter
from dramatiq.rate_limits.backends import RedisBackend

backend = RedisBackend()
DISTRIBUTED_MUTEX = ConcurrentRateLimiter(backend, "distributed-mutex", limit=1)

def one_at_a_time():
  with DISTRIBUTED_MUTEX.acquire():

Whenever two one_at_a_time actors run at the same time, one of them will be retried with exponential backoff. This works by raising an exception and relying on the built-in Retries middleware to do the work of re-enqueueing the task.

If you want rate limiters not to raise an exception when they can’t be acquired, you should pass raise_on_failure=False to acquire:

with DISTRIBUTED_MUTEX.acquire(raise_on_failure=False) as acquired:
  if not acquired:
    print("Lock could not be acquired.")
    print("Lock was acquired.")


Storing message results

You can use Dramatiq’s result backends to store and retrieve message return values. To enable result storage, you need to instantiate and add the Results middleware to your broker.

import dramatiq

from dramatiq.brokers.rabbitmq import RabbitmqBroker
from dramatiq.results.backends import RedisBackend
from dramatiq.results import Results

result_backend = RedisBackend()
broker = RabbitmqBroker()

def add(x, y):
  return x + y

if __name__ == "__main__":
  message = add.send(1, 2)

Getting a result raises ResultMissing when a result hasn’t been stored yet or if it has already expired (results expire after 10 minutes by default). When the block parameter is True, ResultTimeout is raised instead.


Scheduling messages

APScheduler is the recommended scheduler to use with Dramatiq:

import dramatiq
import sys

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime

def print_current_date():

if __name__ == "__main__":
  scheduler = BlockingScheduler()
    CronTrigger.from_crontab("* * * * *"),
  except KeyboardInterrupt: