Cookbook

This part of the docs contains recipes for various things you might want to do using dramatiq. Each section will be light on prose and code heavy, so if you have any questions about one of the recipes, open an issue on GitHub.

Usage with Django

Check out the django_dramatiq project if you want to use Dramatiq with Django. Check out the django_dramatiq_example repo to see an example app build with Django and Dramatiq.

Usage with Flask

Check out the flask_dramatiq_example repo to see an example app built with Flask and Dramatiq.

Rate limiting work

You can use dramatiq’s RateLimiters to constrain actor concurrency.

import dramatiq
import time

from dramatiq.rate_limits import ConcurrentRateLimiter
from dramatiq.rate_limits.backends import RedisBackend

backend = RedisBackend()
DISTRIBUTED_MUTEX = ConcurrentRateLimiter(backend, "distributed-mutex", limit=1)

@dramatiq.actor
def one_at_a_time():
  with DISTRIBUTED_MUTEX.acquire():
    time.sleep(1)
    print("Done.")

Whenever two one_at_a_time actors run at the same time, one of them will be retried with exponential backoff. This works by raising an exception and relying on the built-in Retries middleware to do the work of re-enqueueing the task.

If you want rate limiters not to raise an exception when they can’t be acquired, you should pass raise_on_failure=False to acquire:

with DISTRIBUTED_MUTEX.acquire(raise_on_failure=False) as acquired:
  if not acquired:
    print("Lock could not be acquired.")
  else:
    print("Lock was acquired.")

Reporting errors with Rollbar

Rollbar provides an easy-to-use Python client. Add it to your project with pipenv:

$ pipenv install rollbar

Save the following middleware to a module inside your project:

myapp.rollbar_middleware
import dramatiq
import rollbar

class RollbarMiddleware(dramatiq.Middleware):
  def after_process_message(self, broker, message, *, result=None, exception=None):
    if exception is not None:
      rollbar.report_exc_info()

Finally, instantiate and add it to your broker:

myapp.main
rollbar.init(YOUR_ROLLBAR_KEY)
broker.add_middleware(path.to.RollbarMiddleware())

Reporting errors with Sentry

Install Sentry’s raven client with pipenv:

$ pipenv install raven

Save the following middleware to a module inside your project:

myapp.sentry_middleware
import dramatiq

class SentryMiddleware(dramatiq.Middleware):
  def __init__(self, raven_client):
    self.raven_client = raven_client

  def after_process_message(self, broker, message, *, result=None, exception=None):
    if exception is not None:
      self.raven_client.captureException()

Finally, instantiate and add it to your broker:

myapp.main
from raven import Client

raven_client = Client(YOUR_DSN)
broker.add_middleware(path.to.SentryMiddleware(raven_client))

Retrying connection errors on startup

Dramatiq does not retry connection errors that occur on worker startup. It does, however, return a specific exit code (3) when that happens. Using that, you can build a wrapper script around it if you need to retry with backoff when connection errors happen during startup (eg. in Docker):

#!/usr/bin/env bash

delay=1
while true; do
  dramatiq $@
  if [ $? -eq 3 ]; then
    echo "Connection error encountered on startup. Retrying in $delay second(s)..."
    sleep $delay
    delay=$((delay * 2))
  else
    exit $?
  fi
done