API Reference

Functions

dramatiq.get_broker()[source]

Get the global broker instance. If no global broker is set, this initializes a RabbitmqBroker and returns that.

Returns:The default Broker.
Return type:Broker
dramatiq.set_broker(broker)[source]

Configure the global broker instance.

Parameters:broker (Broker) – The broker instance to use by default.

Actors & Messages

dramatiq.actor(fn=None, *, actor_name=None, queue_name=’default’, priority=0, broker=None, **options)[source]

Declare an actor.

Examples

>>> import dramatiq
>>> @dramatiq.actor
... def add(x, y):
...   print(x + y)
...
>>> add
Actor(<function add at 0x106c6d488>, queue_name='default', actor_name='add')
>>> add(1, 2)
3
>>> add.send(1, 2)
Message(
  queue_name='default',
  actor_name='add',
  args=(1, 2), kwargs={}, options={},
  message_id='e0d27b45-7900-41da-bb97-553b8a081206',
  message_timestamp=1497862448685)
Parameters:
  • fn (callable) – The function to wrap.
  • actor_name (str) – The name of the actor.
  • queue_name (str) – The name of the queue to use.
  • priority (int) – The actor’s global priority. If two tasks have been pulled on a worker concurrently and one has a higher priority than the other then it will be processed first. Lower numbers represent higher priorities.
  • broker (Broker) – The broker to use with this actor.
  • **options (dict) – Arbitrary options that vary with the set of middleware that you use. See get_broker().actor_options.
Returns:

The decorated function.

Return type:

Actor

class dramatiq.Actor(fn, *, broker, actor_name, queue_name, priority, options)[source]

Thin wrapper around callables that stores metadata about how they should be executed asynchronously.

send(*args, **kwargs)[source]

Asynchronously send a message to this actor.

Note

All arguments must be JSON-encodable.

Parameters:
  • *args (tuple) – Positional arguments to send to the actor.
  • **kwargs (dict) – Keyword arguments to send to the actor.
Returns:

The enqueued message.

Return type:

Message

send_with_options(*, args=None, kwargs=None, delay=None, **options)[source]

Asynchronously send a message to this actor, along with an arbitrary set of processing options for the broker and middleware.

Parameters:
  • args (tuple) – Positional arguments that are passed to the actor.
  • kwargs (dict) – Keyword arguments that are passed to the actor.
  • delay (int) – The minimum amount of time, in milliseconds, the message should be delayed by.
  • **options (dict) – Arbitrary options that are passed to the broker and any registered middleware.
Returns:

The enqueued message.

Return type:

Message

class dramatiq.Message[source]

Encapsulates metadata about messages being sent to individual actors.

Parameters:
  • queue_name (str) – The name of the queue the message belogns to.
  • actor_name (str) – The name of the actor that will receive the message.
  • args (tuple) – Positional arguments that are passed to the actor.
  • kwargs (dict) – Keyword arguments that are passed to the actor.
  • options (dict) – Arbitrary options passed to the broker and middleware.
  • message_id (str) – A globally-unique id assigned to the actor.
  • message_timestamp (int) – The UNIX timestamp in milliseconds representing when the message was first enqueued.
classmethod decode(data)[source]

Convert a JSON bytestring to a message.

encode()[source]

Convert this message to a JSON bytestring.

new_id()[source]

Return a copy of this message with a new unique id assigned to it.

Brokers

class dramatiq.Broker(middleware=None)[source]

Base class for broker implementations.

Parameters:middleware (list[Middleware]) – The set of middleware that apply to this broker. If you supply this parameter, you are expected to declare all middleware. Most of the time, you’ll want to use add_middleware() instead.
actor_options

set[str] – The names of all the options actors may overwrite when they are declared.

add_middleware(middleware, *, before=None, after=None)[source]

Add a middleware object to this broker. The middleware is appended to the end of the middleware list by default.

You can specify another middleware (by class) as a reference point for where the new middleware should be added.

Parameters:
  • middleware (Middleware) – The middleware.
  • before (type) – Add this middleware before a specific one.
  • after (type) – Add this middleware after a specific one.
Raises:

ValueError – When either before or after refer to a middleware that hasn’t been registered yet.

close()[source]

Close this broker and perform any necessary cleanup actions.

consume(queue_name, prefetch=1, timeout=30000)[source]

Get an iterator that consumes messages off of the queue.

Raises:

QueueNotFound – If the given queue was never declared.

Parameters:
  • queue_name (str) – The name of the queue to consume messages off of.
  • prefetch (int) – The number of messages to prefetch per consumer.
  • timeout (int) – The amount of time in milliseconds to idle for.
Returns:

A message iterator.

Return type:

Consumer

declare_actor(actor)[source]

Declare a new actor on this broker. Declaring an Actor twice replaces the first actor with the second by name.

Parameters:actor (Actor) – The actor being declared.
declare_queue(queue_name)[source]

Declare a queue on this broker. This method must be idempotent.

Parameters:queue_name (str) – The name of the queue being declared.
enqueue(message, *, delay=None)[source]

Enqueue a message on this broker.

Parameters:
  • message (Message) – The message to enqueue.
  • delay (int) – The number of milliseconds to delay the message for.
get_actor(actor_name)[source]

Look up an actor by its name.

Parameters:actor_name (str) – The name to look up.
Raises:ActorNotFound – If the actor was never declared.
Returns:The actor.
Return type:Actor
get_declared_actors()[source]

Get all declared actors.

Returns:The names of all the actors declared so far on this Broker.
Return type:set[str]
get_declared_delay_queues()[source]

Get all declared delay queues.

Returns:The names of all the delay queues declared so far on this Broker.
Return type:set[str]
get_declared_queues()[source]

Get all declared queues.

Returns:The names of all the queues declared so far on this Broker.
Return type:set[str]
class dramatiq.Consumer[source]

Consumers iterate over messages on a queue.

Consumers and their MessageProxies are not thread-safe.

__iter__()[source]

Returns this instance as a Message iterator.

__next__()[source]

Retrieve the next message off of the queue. This method blocks until a message becomes available.

Returns:A transparent proxy around a Message that can be used to acknowledge or reject it once it’s done being processed.
Return type:MessageProxy
ack(message)[source]

Acknowledge that a message has been processed, removing it from the broker.

Parameters:message (MessageProxy) – The message to acknowledge.
close()[source]

Close this consumer and perform any necessary cleanup actions.

nack(message)[source]

Move a message to the dead-letter queue.

Parameters:message (MessageProxy) – The message to reject.
class dramatiq.MessageProxy(message)[source]

Base class for messages returned by Broker.consume().

fail()[source]

Mark this message for rejection.

class dramatiq.brokers.rabbitmq.RabbitmqBroker(parameters=None, middleware=None)[source]

A broker that can be used with RabbitMQ.

Parameters:
  • parameters (pika.ConnectionParameters) – The connection parameters to use to determine which Rabbit server to connect to.
  • middleware (list[Middleware]) – The set of middleware that apply to this broker.
add_middleware(middleware, *, before=None, after=None)

Add a middleware object to this broker. The middleware is appended to the end of the middleware list by default.

You can specify another middleware (by class) as a reference point for where the new middleware should be added.

Parameters:
  • middleware (Middleware) – The middleware.
  • before (type) – Add this middleware before a specific one.
  • after (type) – Add this middleware after a specific one.
Raises:

ValueError – When either before or after refer to a middleware that hasn’t been registered yet.

channel

The pika.BlockingChannel for the current thread. This property may change without notice.

close()[source]

Close all open RabbitMQ connections.

connection

The pika.BlockingConnection for the current thread. This property may change without notice.

consume(queue_name, prefetch=1, timeout=5000)[source]

Create a new consumer for a queue.

Parameters:
  • queue_name (str) – The queue to consume.
  • prefetch (int) – The number of messages to prefetch.
  • timeout (int) – The idle timeout in milliseconds.
Returns:

A consumer that retrieves messages from RabbitMQ.

Return type:

Consumer

declare_actor(actor)

Declare a new actor on this broker. Declaring an Actor twice replaces the first actor with the second by name.

Parameters:actor (Actor) – The actor being declared.
declare_queue(queue_name)[source]

Declare a queue. Has no effect if a queue with the given name already exists.

Parameters:queue_name (str) – The name of the new queue.
Raises:ConnectionClosed – If the underlying channel or connection has been closed.
enqueue(message, *, delay=None)[source]

Enqueue a message.

Parameters:
  • message (Message) – The message to enqueue.
  • delay (int) – The minimum amount of time, in milliseconds, to delay the message by.
Raises:

ConnectionClosed – If the underlying channel or connection has been closed.

get_actor(actor_name)

Look up an actor by its name.

Parameters:actor_name (str) – The name to look up.
Raises:ActorNotFound – If the actor was never declared.
Returns:The actor.
Return type:Actor
get_declared_actors()

Get all declared actors.

Returns:The names of all the actors declared so far on this Broker.
Return type:set[str]
get_declared_delay_queues()

Get all declared delay queues.

Returns:The names of all the delay queues declared so far on this Broker.
Return type:set[str]
get_declared_queues()[source]

Get all declared queues.

Returns:The names of all the queues declared so far on this Broker.
Return type:set[str]
get_queue_message_counts(queue_name)[source]

Get the number of messages in a queue. This method is only meant to be used in unit and integration tests.

Parameters:queue_name (str) – The queue whose message counts to get.
Returns:A triple representing the number of messages in the queue, its delayed queue and its dead letter queue.
Return type:tuple
join(queue_name)[source]

Wait for all the messages on the given queue to be processed. This method is only meant to be used in tests to wait for all the messages in a queue to be processed.

Note

This method doesn’t wait for unacked messages so it may not be completely reliable. Use the stub broker in your unit tests and only use this for simple integration tests.

Parameters:queue_name (str) – The queue to wait on.
class dramatiq.brokers.redis.RedisBroker(*, middleware=None, namespace=’dramatiq’, **parameters)[source]

A broker than can be used with Redis.

Parameters:
  • middleware (list[Middleware]) –
  • namespace (str) – The str with which to prefix all Redis keys.
  • **parameters (dict) – Connection parameters are passed directly to redis.StrictRedis.
add_middleware(middleware, *, before=None, after=None)

Add a middleware object to this broker. The middleware is appended to the end of the middleware list by default.

You can specify another middleware (by class) as a reference point for where the new middleware should be added.

Parameters:
  • middleware (Middleware) – The middleware.
  • before (type) – Add this middleware before a specific one.
  • after (type) – Add this middleware after a specific one.
Raises:

ValueError – When either before or after refer to a middleware that hasn’t been registered yet.

close()[source]

Close this broker.

consume(queue_name, prefetch=1, timeout=5000)[source]

Create a new consumer for a queue.

Parameters:
  • queue_name (str) – The queue to consume.
  • prefetch (int) – The number of messages to prefetch.
  • timeout (int) – The idle timeout in milliseconds.
Returns:

A consumer that retrieves messages from Redis.

Return type:

Consumer

declare_actor(actor)

Declare a new actor on this broker. Declaring an Actor twice replaces the first actor with the second by name.

Parameters:actor (Actor) – The actor being declared.
declare_queue(queue_name)[source]

Declare a queue. Has no effect if a queue with the given name has already been declared.

Parameters:queue_name (str) – The name of the new queue.
enqueue(message, *, delay=None)[source]

Enqueue a message.

Parameters:
  • message (Message) – The message to enqueue.
  • delay (int) – The minimum amount of time, in milliseconds, to delay the message by. Must be less than 7 days.
Raises:

ValueError – If delay is longer than 7 days.

get_actor(actor_name)

Look up an actor by its name.

Parameters:actor_name (str) – The name to look up.
Raises:ActorNotFound – If the actor was never declared.
Returns:The actor.
Return type:Actor
get_declared_actors()

Get all declared actors.

Returns:The names of all the actors declared so far on this Broker.
Return type:set[str]
get_declared_delay_queues()

Get all declared delay queues.

Returns:The names of all the delay queues declared so far on this Broker.
Return type:set[str]
get_declared_queues()[source]

Get all declared queues.

Returns:The names of all the queues declared so far on this Broker.
Return type:set[str]
join(queue_name)[source]

Wait for all the messages on the given queue to be processed. This method is only meant to be used in tests to wait for all the messages in a queue to be processed.

Parameters:queue_name (str) – The queue to wait on.
class dramatiq.brokers.stub.StubBroker(middleware=None)[source]

A broker that can be used within unit tests.

dead_letters

list[Message] – Contains the dead-lettered messages for all defined queues.

add_middleware(middleware, *, before=None, after=None)

Add a middleware object to this broker. The middleware is appended to the end of the middleware list by default.

You can specify another middleware (by class) as a reference point for where the new middleware should be added.

Parameters:
  • middleware (Middleware) – The middleware.
  • before (type) – Add this middleware before a specific one.
  • after (type) – Add this middleware after a specific one.
Raises:

ValueError – When either before or after refer to a middleware that hasn’t been registered yet.

close()

Close this broker and perform any necessary cleanup actions.

consume(queue_name, prefetch=1, timeout=100)[source]

Create a new consumer for a queue.

Parameters:
  • queue_name (str) – The queue to consume.
  • prefetch (int) – The number of messages to prefetch.
  • timeout (int) – The idle timeout in milliseconds.
Raises:

QueueNotFound – If the queue hasn’t been declared.

Returns:

A consumer that retrieves messages from Redis.

Return type:

Consumer

declare_actor(actor)

Declare a new actor on this broker. Declaring an Actor twice replaces the first actor with the second by name.

Parameters:actor (Actor) – The actor being declared.
declare_queue(queue_name)[source]

Declare a queue. Has no effect if a queue with the given name has already been declared.

Parameters:queue_name (str) – The name of the new queue.
enqueue(message, *, delay=None)[source]

Enqueue a message.

Parameters:
  • message (Message) – The message to enqueue.
  • delay (int) – The minimum amount of time, in milliseconds, to delay the message by.
Raises:

QueueNotFound – If the queue the message is being enqueued on doesn’t exist.

get_actor(actor_name)

Look up an actor by its name.

Parameters:actor_name (str) – The name to look up.
Raises:ActorNotFound – If the actor was never declared.
Returns:The actor.
Return type:Actor
get_declared_actors()

Get all declared actors.

Returns:The names of all the actors declared so far on this Broker.
Return type:set[str]
get_declared_delay_queues()

Get all declared delay queues.

Returns:The names of all the delay queues declared so far on this Broker.
Return type:set[str]
get_declared_queues()

Get all declared queues.

Returns:The names of all the queues declared so far on this Broker.
Return type:set[str]
join(queue_name)[source]

Wait for all the messages on the given queue to be processed. This method is only meant to be used in tests to wait for all the messages in a queue to be processed.

Parameters:queue_name (str) – The queue to wait on.
Raises:QueueNotFound – If the given queue was never declared.

Middleware

class dramatiq.Middleware[source]

Base class for broker middleware. The default implementations for all hooks are no-ops and subclasses may implement whatever subset of hooks they like.

actor_options

The set of options that may be configured on each actor.

before_ack(broker, message)[source]

Called before a message is acknowledged.

after_ack(broker, message)[source]

Called after a message has been acknowledged.

before_nack(broker, message)[source]

Called before a message is rejected.

after_nack(broker, message)[source]

Called after a message has been rejected.

before_declare_actor(broker, actor)[source]

Called before an actor is declared.

after_declare_actor(broker, actor)[source]

Called after an actor has been declared.

before_declare_queue(broker, queue_name)[source]

Called before a queue is declared.

after_declare_queue(broker, queue_name)[source]

Called after a queue has been declared.

after_declare_delay_queue(broker, queue_name)[source]

Called after a delay queue has been declared.

before_enqueue(broker, message, delay)[source]

Called before a message is enqueued.

after_enqueue(broker, message, delay)[source]

Called after a message has been enqueued.

before_delay_message(broker, message)[source]

Called before a message has been delayed in worker memory.

before_process_message(broker, message)[source]

Called before a message is processed.

after_process_message(broker, message, *, result=None, exception=None)[source]

Called after a message has been processed.

after_process_boot(broker)[source]

Called immediately after subprocess start up.

before_worker_boot(broker, worker)[source]

Called before the worker processes starts up.

after_worker_boot(broker, worker)[source]

Called after the worker process has started up.

before_worker_shutdown(broker, worker)[source]

Called before the worker process shuts down.

after_worker_shutdown(broker, worker)[source]

Called after the worker process shuts down.

class dramatiq.middleware.AgeLimit(*, max_age=None)[source]

Middleware that drops messages that have been in the queue for too long.

Parameters:max_age (int) – The default message age limit in millseconds. Defaults to None, meaning that messages can exist indefinitely.
class dramatiq.middleware.Prometheus(*, http_host=’localhost’, http_port=9191)[source]

A middleware that exports stats via Prometheus.

Parameters:
  • http_host (str) – The host to bind the Prometheus exposition server on. This parameter can also be configured via the dramatiq_prom_host environment variable.
  • http_port (int) – The port on which the server should listen. This parameter can also be configured via the dramatiq_prom_port environment variable.
class dramatiq.middleware.Retries(*, max_retries=20, min_backoff=None, max_backoff=None)[source]

Middleware that automatically retries failed tasks with exponential backoff.

Parameters:
  • max_retires (int) – The maximum number of times tasks can be retried.
  • min_backoff (int) – The minimum amount of backoff milliseconds to apply to retried tasks. Defaults to 15 seconds.
  • max_backoff (int) – The maximum amount of backoff milliseconds to apply to retried tasks. Defaults to 7 days.
class dramatiq.middleware.TimeLimit(*, time_limit=600000, interval=1000)[source]

Middleware that cancels actors that run for too long. Currently, this is only available on CPython.

Note

This works by setting an async exception in the worker thread that runs the actor. This means that the exception will only get called the next time that thread acquires the GIL. Concretely, this means that this middleware can’t cancel system calls.

Parameters:
  • time_limit (int) – The maximum number of milliseconds actors may run for.
  • interval (int) – The interval (in milliseconds) with which to check for actors that have exceeded the limit.
class dramatiq.middleware.TimeLimitExceeded[source]

Raised asynchronously inside worker threads when actors exceed their time limits.

This is intentionally not a subclass of DramatiqError to avoid it being caught unintentionally.

Rate Limiters

Rate limiters can be used to determine whether or not an operation can be run at the current time across many processes and machines by using a shared storage backend.

Backends

Rate limiter backends are used to store metadata about rate limits.

class dramatiq.rate_limits.RateLimiterBackend[source]

ABC for rate limiter backends.

class dramatiq.rate_limits.backends.MemcachedBackend(*, pool_size=8, **parameters)[source]

A rate limiter backend for Memcached.

Examples

>>> from dramatiq.rate_limits.backends import MemcachedBackend
>>> backend = MemcachedBackend(servers=["127.0.0.1"], binary=True)
Parameters:
  • pool_size (int) – The size of the connection pool to use.
  • **parameters (dict) – Connection parameters are passed directly to pylibmc.Client.
class dramatiq.rate_limits.backends.RedisBackend(**parameters)[source]

A rate limiter backend for Redis.

Parameters:**parameters (dict) – Connection parameters are passed directly to redis.StrictRedis.

Limiters

class dramatiq.rate_limits.RateLimiter(backend, key)[source]

ABC for rate limiters.

Examples

>>> from dramatiq.rate_limits.backends import RedisBackend
>>> backend = RedisBackend()
>>> limiter = ConcurrentRateLimiter(backend, "distributed-mutex", 1)
>>> with limiter.acquire(raise_on_failure=False) as acquired:
...   if not acquired:
...     print("Mutex not acquired.")
...     return
...
...   print("Mutex acquired.")
Parameters:
  • backend (RateLimiterBackend) – The rate limiting backend to use.
  • key (str) – The key to rate limit on.
acquire(*, raise_on_failure=True)[source]

Attempt to acquire a slot under this rate limiter.

Parameters:raise_on_failure (bool) – Whether or not failures should raise an exception. If this is false, the context manager will instead return a boolean value representing whether or not the rate limit slot was acquired.
Returns:Whether or not the slot could be acquired.
Return type:bool
class dramatiq.rate_limits.BucketRateLimiter(backend, key, *, limit=1, bucket=1000)[source]

A rate limiter that ensures that only up to limit operations may happen over some time interval.

Examples

Up to 10 operations every second:

>>> BucketRateLimiter(backend, "some-key", limit=10, bucket=1_000)

Up to 1 operation every minute:

>>> BucketRateLimiter(backend, "some-key", limit=1, bucket=60_000)

Warning

Bucket rate limits are cheap to maintain but are susceptible to burst “attacks”. Given a bucket rate limit of 100 per minute, an attacker could make a burst of 100 calls in the last second of a minute and then another 100 calls in the first second of the subsequent minute.

For a rate limiter that doesn’t have this problem (but is more expensive to maintain), see WindowRateLimiter.

Parameters:
  • backend (RateLimiterBackend) – The backend to use.
  • key (str) – The key to rate limit on.
  • limit (int) – The maximum number of operations per bucket per key.
  • bucket (int) – The bucket interval in milliseconds.
class dramatiq.rate_limits.ConcurrentRateLimiter(backend, key, *, limit=1, ttl=900000)[source]

A rate limiter that ensures that only limit concurrent operations may happen at the same time.

Note

You can use a concurrent rate limiter of size 1 to get a distributed mutex.

Parameters:
  • backend (RateLimiterBackend) – The backend to use.
  • key (str) – The key to rate limit on.
  • limit (int) – The maximum number of concurrent operations per key.
  • ttl (int) – The time in milliseconds that keys may live for.
class dramatiq.rate_limits.WindowRateLimiter(backend, key, *, limit=1, window=1)[source]

A rate limiter that ensures that only limit operations may happen over some sliding window.

Note

Windows are in seconds rather that milliseconds. This is different from most durations and intervals used in Dramatiq, because keeping metadata at the millisecond level is far too expensive for most use cases.

Parameters:
  • backend (RateLimiterBackend) – The backend to use.
  • key (str) – The key to rate limit on.
  • limit (int) – The maximum number of operations per window per key.
  • window (int) – The window size in seconds. The wider the window, the more expensive it is to maintain.

Workers

class dramatiq.Worker(broker, *, worker_timeout=1000, worker_threads=8)[source]

Workers consume messages off of all declared queues and distribute those messages to individual worker threads for processing. Workers don’t block the current thread so it’s up to the caller to keep it alive.

Don’t run more than one Worker per process.

Parameters:
  • broker (Broker) –
  • worker_timeout (int) – The number of milliseconds workers should wake up after if the queue is idle.
  • worker_threads (int) – The number of worker threads to spawn.
join()[source]

Wait for this worker to complete its work in progress. This method is useful when testing code.

start()[source]

Initialize the worker boot sequence and start up all the worker threads.

stop(timeout=600000)[source]

Gracefully stop the Worker and all of its consumers and workers.

Parameters:timeout (int) – The number of milliseconds to wait for everything to shut down.

Errors

class dramatiq.DramatiqError(message)[source]

Base class for all dramatiq errors.

class dramatiq.BrokerError(message)[source]

Base class for broker-related errors.

class dramatiq.ActorNotFound(message)[source]

Raised when a message is sent to an actor that hasn’t been declared.

class dramatiq.QueueNotFound(message)[source]

Raised when a message is sent to an queue that hasn’t been declared.

class dramatiq.ConnectionError(message)[source]

Base class for broker connection-related errors.

class dramatiq.ConnectionClosed(message)[source]

Raised when a broker connection is suddenly closed.

class dramatiq.ConnectionFailed(message)[source]

Raised when a broker connection could not be opened.

class dramatiq.RateLimitExceeded(message)[source]

Raised when a rate limit has been exceeded.