API Reference¶
Functions¶
- dramatiq.get_broker() Broker [source]¶
Get the global broker instance.
If no global broker is set, a RabbitMQ broker will be returned. If the RabbitMQ dependencies are not installed, a Redis broker will be returned.
- Returns:
The default Broker.
- Return type:
Actors & Messages¶
- dramatiq.actor(fn: Callable[[P], Awaitable[R] | R], **kwargs) Actor[P, R] [source]¶
- dramatiq.actor(fn: None = None, **kwargs) Callable[[Callable[[P], Awaitable[R] | R]], Actor[P, R]]
Declare an actor.
Examples
>>> import dramatiq
>>> @dramatiq.actor ... def add(x, y): ... print(x + y) ... >>> add Actor(<function add at 0x106c6d488>, queue_name='default', actor_name='add')
>>> add(1, 2) 3
>>> add.send(1, 2) Message( queue_name='default', actor_name='add', args=(1, 2), kwargs={}, options={}, message_id='e0d27b45-7900-41da-bb97-553b8a081206', message_timestamp=1497862448685)
- Parameters:
fn (callable) – The function to wrap.
actor_class (type) – Type created by the decorator. Defaults to
Actor
but can be any callable as long as it returns an actor and takes the same arguments as theActor
class.actor_name (str) – The name of the actor.
queue_name (str) – The name of the queue to use.
priority (int) – The actor’s global priority. If two tasks have been pulled on a worker concurrently and one has a higher priority than the other then it will be processed first. Lower numbers represent higher priorities.
broker (Broker) – The broker to use with this actor.
**options – Arbitrary options that vary with the set of middleware that you use. See
get_broker().actor_options
.
- Returns:
The decorated function.
- Return type:
- class dramatiq.Actor(fn: Callable[[P], Awaitable[R] | R], *, broker: Broker, actor_name: str, queue_name: str, priority: int, options: Dict[str, Any])[source]¶
Thin wrapper around callables that stores metadata about how they should be executed asynchronously. Actors are callable.
- logger¶
The actor’s logger.
- Type:
Logger
- fn¶
The underlying callable.
- Type:
callable
- message(*args: P.args, **kwargs: P.kwargs) Message[R] [source]¶
Build a message. This method is useful if you want to compose actors. See the actor composition documentation for details.
- Parameters:
*args (tuple) – Positional arguments to send to the actor.
**kwargs – Keyword arguments to send to the actor.
Examples
>>> (add.message(1, 2) | add.message(3)) pipeline([add(1, 2), add(3)])
- Returns:
A message that can be enqueued on a broker.
- Return type:
- message_with_options(*, args: tuple = (), kwargs: Dict[str, Any] | None = None, **options) Message[R] [source]¶
Build a message with an arbitrary set of processing options. This method is useful if you want to compose actors. See the actor composition documentation for details.
- Parameters:
- Returns:
A message that can be enqueued on a broker.
- Return type:
- send(*args: P.args, **kwargs: P.kwargs) Message[R] [source]¶
Asynchronously send a message to this actor.
- Parameters:
*args – Positional arguments to send to the actor.
**kwargs – Keyword arguments to send to the actor.
- Returns:
The enqueued message.
- Return type:
- send_with_options(*, args: tuple = (), kwargs: Dict[str, Any] | None = None, delay: timedelta | int | None = None, **options) Message[R] [source]¶
Asynchronously send a message to this actor, along with an arbitrary set of processing options for the broker and middleware.
- Parameters:
args (tuple) – Positional arguments that are passed to the actor.
kwargs (dict) – Keyword arguments that are passed to the actor.
delay (int) – The minimum amount of time, in milliseconds, the message should be delayed by. Also accepts a timedelta.
**options – Arbitrary options that are passed to the broker and any registered middleware.
- Returns:
The enqueued message.
- Return type:
- class dramatiq.Message(queue_name: str, actor_name: str, args: tuple, kwargs: ~typing.Dict[str, ~typing.Any], options: ~typing.Dict[str, ~typing.Any], message_id: str = <factory>, message_timestamp: int = <factory>)[source]¶
Encapsulates metadata about messages being sent to individual actors.
- Parameters:
queue_name (str) – The name of the queue the message belongs to.
actor_name (str) – The name of the actor that will receive the message.
args (tuple) – Positional arguments that are passed to the actor.
kwargs (dict) – Keyword arguments that are passed to the actor.
options (dict) – Arbitrary options passed to the broker and middleware.
message_id (str) – A globally-unique id assigned to the actor.
message_timestamp (int) – The UNIX timestamp in milliseconds representing when the message was first enqueued.
- classmethod decode(data: bytes) Message [source]¶
Convert a bytestring to a message.
- Raises:
DecodeError – When the decoder raises an exception while decoding data.
- get_result(*, backend: ResultBackend | None = None, block: bool = False, timeout: int | None = None) R [source]¶
Get the result associated with this message from a result backend.
Warning
If you use multiple result backends or brokers you should always pass the backend parameter. This method is only able to infer the result backend off of the default broker.
- Parameters:
backend (ResultBackend) – The result backend to use to get the result. If omitted, this method will try to find and use the result backend on the default broker instance.
block (bool) – Whether or not to block while waiting for a result.
timeout (int) – The maximum amount of time, in ms, to block while waiting for a result. Defaults to 10 seconds.
- Raises:
RuntimeError – If there is no result backend on the default broker.
ResultMissing – When block is False and the result isn’t set.
ResultTimeout – When waiting for a result times out.
- Returns:
The result.
- Return type:
Class-based Actors¶
Message Composition¶
- class dramatiq.group(children, *, broker=None)[source]¶
Run a group of actors in parallel.
- Parameters:
- add_completion_callback(message)[source]¶
Adds a completion callback to run once every job in this group has completed. Each group may have multiple completion callbacks.
Warning
This functionality is dependent upon the GroupCallbacks middleware. If that’s not set up correctly, then calling run after adding a callback will raise a RuntimeError.
- Parameters:
message (Message) –
- property completed¶
Returns True when all the jobs in the group have been completed. Actors that don’t store results are not counted, meaning this may be inaccurate if all or some of your actors don’t store results.
- Raises:
RuntimeError – If your broker doesn’t have a result backend set up.
- property completed_count¶
Returns the total number of jobs that have been completed. Actors that don’t store results are not counted, meaning this may be inaccurate if all or some of your actors don’t store results.
- Raises:
RuntimeError – If your broker doesn’t have a result backend set up.
- Returns:
The total number of results.
- Return type:
- get_results(*, block=False, timeout=None)[source]¶
Get the results of each job in the group.
- Parameters:
- Raises:
ResultMissing – When block is False and the results aren’t set.
ResultTimeout – When waiting for results times out.
- Returns:
A result generator.
- wait(*, timeout=None)[source]¶
Block until all the jobs in the group have finished or until the timeout expires.
- Parameters:
timeout (int) – The maximum amount of time, in ms, to wait. Defaults to 10 seconds.
- Raises:
ResultTimeout – When waiting times out.
- class dramatiq.pipeline(children: Iterable[Message | pipeline], *, broker=None)[source]¶
Chain actors together, passing the result of one actor to the next one in line.
- Parameters:
- property completed¶
- Returns True when all the jobs in the pipeline have been
completed. This will always return False if the last actor in the pipeline doesn’t store results.
- Raises:
RuntimeError – If your broker doesn’t have a result backend set up.
- property completed_count¶
Returns the total number of jobs that have been completed. Actors that don’t store results are not counted, meaning this may be inaccurate if all or some of your actors don’t store results.
- Raises:
RuntimeError – If your broker doesn’t have a result backend set up.
- Returns:
The total number of results.
- Return type:
- get_result(*, block=False, timeout=None)[source]¶
Get the result of this pipeline.
Pipeline results are represented by the result of the last message in the chain.
- Parameters:
- Raises:
ResultMissing – When block is False and the result isn’t set.
ResultTimeout – When waiting for a result times out.
- Returns:
The result.
- Return type:
- get_results(*, block=False, timeout=None)[source]¶
Get the results of each job in the pipeline.
- Parameters:
- Raises:
ResultMissing – When block is False and the result isn’t set.
ResultTimeout – When waiting for a result times out.
- Returns:
A result generator.
Message Encoders¶
Encoders are used to serialize and deserialize messages over the wire.
Brokers¶
- class dramatiq.Broker(middleware=None)[source]¶
Base class for broker implementations.
- Parameters:
middleware (list[Middleware]) – The set of middleware that apply to this broker. If you supply this parameter, you are expected to declare all middleware. Most of the time, you’ll want to use
add_middleware()
instead.
- actor_options¶
The names of all the options actors may overwrite when they are declared.
- add_middleware(middleware, *, before=None, after=None)[source]¶
Add a middleware object to this broker. The middleware is appended to the end of the middleware list by default.
You can specify another middleware (by class) as a reference point for where the new middleware should be added.
- Parameters:
middleware (Middleware) – The middleware.
before (type) – Add this middleware before a specific one.
after (type) – Add this middleware after a specific one.
- Raises:
ValueError – When either
before
orafter
refer to a middleware that hasn’t been registered yet.
- consume(queue_name, prefetch=1, timeout=30000)[source]¶
Get an iterator that consumes messages off of the queue.
- Raises:
QueueNotFound – If the given queue was never declared.
- Parameters:
- Returns:
A message iterator.
- Return type:
- declare_actor(actor)[source]¶
Declare a new actor on this broker. Declaring an Actor twice replaces the first actor with the second by name.
- Parameters:
actor (Actor) – The actor being declared.
- declare_queue(queue_name)[source]¶
Declare a queue on this broker. This method must be idempotent.
- Parameters:
queue_name (str) – The name of the queue being declared.
- flush(queue_name)[source]¶
Drop all the messages from a queue.
- Parameters:
queue_name (str) – The name of the queue to flush.
- get_actor(actor_name)[source]¶
Look up an actor by its name.
- Parameters:
actor_name (str) – The name to look up.
- Raises:
ActorNotFound – If the actor was never declared.
- Returns:
The actor.
- Return type:
- get_results_backend()[source]¶
Get the backend of the Results middleware.
- Raises:
RuntimeError – If the broker doesn’t have a results backend.
- Returns:
The backend.
- Return type:
- class dramatiq.Consumer[source]¶
Consumers iterate over messages on a queue.
Consumers and their MessageProxies are not thread-safe.
- __next__()[source]¶
Retrieve the next message off of the queue. This method blocks until a message becomes available.
- Returns:
A transparent proxy around a Message that can be used to acknowledge or reject it once it’s done being processed.
- Return type:
- ack(message)[source]¶
Acknowledge that a message has been processed, removing it from the broker.
- Parameters:
message (MessageProxy) – The message to acknowledge.
- nack(message)[source]¶
Move a message to the dead-letter queue.
- Parameters:
message (MessageProxy) – The message to reject.
- class dramatiq.MessageProxy(message)[source]¶
Base class for messages returned by
Broker.consume()
.
- class dramatiq.brokers.rabbitmq.RabbitmqBroker(*, confirm_delivery=False, url=None, middleware=None, max_priority=None, parameters=None, **kwargs)[source]¶
A broker that can be used with RabbitMQ.
Examples
If you want to specify connection parameters individually:
>>> RabbitmqBroker(host="127.0.0.1", port=5672)
Alternatively, if you want to use a connection URL:
>>> RabbitmqBroker(url="amqp://guest:[email protected]:5672")
To support message priorities, provide a
max_priority
…>>> broker = RabbitmqBroker(url="...", max_priority=255)
… then enqueue messages with the
broker_priority
option:>>> broker.enqueue(an_actor.message_with_options( ... broker_priority=255, ... ))
See also
ConnectionParameters for a list of all the available connection parameters.
- Parameters:
confirm_delivery (bool) – Wait for RabbitMQ to confirm that messages have been committed on every call to enqueue. Defaults to False.
url (str|list[str]) – An optional connection URL. If both a URL and connection parameters are provided, the URL is used.
middleware (list[Middleware]) – The set of middleware that apply to this broker.
max_priority (int) – Configure queues with
x-max-priority
to support queue-global priority queueing.parameters (list[dict]) – A sequence of (pika) connection parameters to determine which Rabbit server(s) to connect to.
**kwargs – The (pika) connection parameters to use to determine which Rabbit server to connect to.
- add_middleware(middleware, *, before=None, after=None)¶
Add a middleware object to this broker. The middleware is appended to the end of the middleware list by default.
You can specify another middleware (by class) as a reference point for where the new middleware should be added.
- Parameters:
middleware (Middleware) – The middleware.
before (type) – Add this middleware before a specific one.
after (type) – Add this middleware after a specific one.
- Raises:
ValueError – When either
before
orafter
refer to a middleware that hasn’t been registered yet.
- property channel¶
The
pika.BlockingChannel
for the current thread. This property may change without notice.
- property connection¶
The
pika.BlockingConnection
for the current thread. This property may change without notice.
- declare_actor(actor)¶
Declare a new actor on this broker. Declaring an Actor twice replaces the first actor with the second by name.
- Parameters:
actor (Actor) – The actor being declared.
- declare_queue(queue_name, *, ensure=False)[source]¶
Declare a queue. Has no effect if a queue with the given name already exists.
- Parameters:
- Raises:
ConnectionClosed – When ensure=True if the underlying channel or connection fails.
- enqueue(message, *, delay=None)[source]¶
Enqueue a message.
- Parameters:
- Raises:
ConnectionClosed – If the underlying channel or connection has been closed.
- flush(queue_name)[source]¶
Drop all the messages from a queue.
- Parameters:
queue_name (str) – The queue to flush.
- get_actor(actor_name)¶
Look up an actor by its name.
- Parameters:
actor_name (str) – The name to look up.
- Raises:
ActorNotFound – If the actor was never declared.
- Returns:
The actor.
- Return type:
- get_declared_actors()¶
Get all declared actors.
- get_declared_delay_queues()¶
Get all declared delay queues.
- get_queue_message_counts(queue_name)[source]¶
Get the number of messages in a queue. This method is only meant to be used in unit and integration tests.
- get_results_backend()¶
Get the backend of the Results middleware.
- Raises:
RuntimeError – If the broker doesn’t have a results backend.
- Returns:
The backend.
- Return type:
- join(queue_name, min_successes=10, idle_time=100, *, timeout=None)[source]¶
Wait for all the messages on the given queue to be processed. This method is only meant to be used in tests to wait for all the messages in a queue to be processed.
Warning
This method doesn’t wait for unacked messages so it may not be completely reliable. Use the stub broker in your unit tests and only use this for simple integration tests.
- Parameters:
- class dramatiq.brokers.redis.RedisBroker(*, url=None, middleware=None, namespace='dramatiq', maintenance_chance=1000, heartbeat_timeout=60000, dead_message_ttl=604800000, requeue_deadline=None, requeue_interval=None, client=None, **parameters)[source]¶
A broker than can be used with Redis.
Examples
If you want to specify connection parameters individually:
>>> RedisBroker(host="127.0.0.1", port=6379, db=0, password="hunter2")
Alternatively, if you want to use a connection URL:
>>> RedisBroker(url="redis://127.0.0.1:6379/0")
See also
Redis_ for a list of all the available connection parameters.
- Parameters:
url (str) – An optional connection URL. If both a URL and connection parameters are provided, the URL is used.
middleware (list[Middleware]) –
maintenance_chance (int) – How many commands out of a million should trigger queue maintenance.
namespace (str) – The str with which to prefix all Redis keys.
heartbeat_timeout (int) – The amount of time (in ms) that has to pass without a heartbeat for a broker process to be considered offline.
dead_message_ttl (int) – The amount of time (in ms) that dead-lettered messages are kept in Redis for.
requeue_deadline (int) – Deprecated. Does nothing.
requeue_interval (int) – Deprecated. Does nothing.
client (redis.StrictRedis) – A redis client to use.
**parameters – Connection parameters are passed directly to
redis.Redis
.
- add_middleware(middleware, *, before=None, after=None)¶
Add a middleware object to this broker. The middleware is appended to the end of the middleware list by default.
You can specify another middleware (by class) as a reference point for where the new middleware should be added.
- Parameters:
middleware (Middleware) – The middleware.
before (type) – Add this middleware before a specific one.
after (type) – Add this middleware after a specific one.
- Raises:
ValueError – When either
before
orafter
refer to a middleware that hasn’t been registered yet.
- close()¶
Close this broker and perform any necessary cleanup actions.
- declare_actor(actor)¶
Declare a new actor on this broker. Declaring an Actor twice replaces the first actor with the second by name.
- Parameters:
actor (Actor) – The actor being declared.
- declare_queue(queue_name)[source]¶
Declare a queue. Has no effect if a queue with the given name has already been declared.
- Parameters:
queue_name (str) – The name of the new queue.
- enqueue(message, *, delay=None)[source]¶
Enqueue a message.
- Parameters:
- Raises:
ValueError – If
delay
is longer than 7 days.
- flush(queue_name)[source]¶
Drop all the messages from a queue.
- Parameters:
queue_name (str) – The queue to flush.
- get_actor(actor_name)¶
Look up an actor by its name.
- Parameters:
actor_name (str) – The name to look up.
- Raises:
ActorNotFound – If the actor was never declared.
- Returns:
The actor.
- Return type:
- get_declared_actors()¶
Get all declared actors.
- get_declared_delay_queues()¶
Get all declared delay queues.
- get_results_backend()¶
Get the backend of the Results middleware.
- Raises:
RuntimeError – If the broker doesn’t have a results backend.
- Returns:
The backend.
- Return type:
- class dramatiq.brokers.stub.StubBroker(middleware=None)[source]¶
A broker that can be used within unit tests.
- add_middleware(middleware, *, before=None, after=None)¶
Add a middleware object to this broker. The middleware is appended to the end of the middleware list by default.
You can specify another middleware (by class) as a reference point for where the new middleware should be added.
- Parameters:
middleware (Middleware) – The middleware.
before (type) – Add this middleware before a specific one.
after (type) – Add this middleware after a specific one.
- Raises:
ValueError – When either
before
orafter
refer to a middleware that hasn’t been registered yet.
- close()¶
Close this broker and perform any necessary cleanup actions.
- consume(queue_name, prefetch=1, timeout=100)[source]¶
Create a new consumer for a queue.
- Parameters:
- Raises:
QueueNotFound – If the queue hasn’t been declared.
- Returns:
A consumer that retrieves messages from Redis.
- Return type:
- property dead_letters¶
The dead-lettered messages for all defined queues.
- declare_actor(actor)¶
Declare a new actor on this broker. Declaring an Actor twice replaces the first actor with the second by name.
- Parameters:
actor (Actor) – The actor being declared.
- declare_queue(queue_name)[source]¶
Declare a queue. Has no effect if a queue with the given name has already been declared.
- Parameters:
queue_name (str) – The name of the new queue.
- enqueue(message, *, delay=None)[source]¶
Enqueue a message.
- Parameters:
- Raises:
QueueNotFound – If the queue the message is being enqueued on doesn’t exist.
- flush(queue_name)[source]¶
Drop all the messages from a queue.
- Parameters:
queue_name (str) – The queue to flush.
- get_actor(actor_name)¶
Look up an actor by its name.
- Parameters:
actor_name (str) – The name to look up.
- Raises:
ActorNotFound – If the actor was never declared.
- Returns:
The actor.
- Return type:
- get_declared_actors()¶
Get all declared actors.
- get_declared_delay_queues()¶
Get all declared delay queues.
- get_declared_queues()¶
Get all declared queues.
- get_results_backend()¶
Get the backend of the Results middleware.
- Raises:
RuntimeError – If the broker doesn’t have a results backend.
- Returns:
The backend.
- Return type:
- join(queue_name, *, fail_fast=False, timeout=None)[source]¶
Wait for all the messages on the given queue to be processed. This method is only meant to be used in tests to wait for all the messages in a queue to be processed.
- Raises:
QueueJoinTimeout – When the timeout elapses.
QueueNotFound – If the given queue was never declared.
- Parameters:
queue_name (str) – The queue to wait on.
fail_fast (bool) – When this is True and any message gets dead-lettered during the join, then an exception will be raised. This will be True by default starting with version 2.0.
timeout (Optional[int]) – The max amount of time, in milliseconds, to wait on this queue.
Middleware¶
The following middleware are all enabled by default.
- class dramatiq.Middleware[source]¶
Base class for broker middleware. The default implementations for all hooks are no-ops and subclasses may implement whatever subset of hooks they like.
- property actor_options¶
The set of options that may be configured on each actor.
- property forks¶
A list of functions to run in separate forks of the main process.
- after_declare_queue(broker, queue_name)[source]¶
Called after a queue has been declared.
This signals that the queue has been registered with the broker, but it does not necessarily mean that it was created on the server yet. For example, the RabbitMQ broker declares queues when actors are created, but it doesn’t instantiate them until messages are enqueued or consumed.
- after_declare_delay_queue(broker, queue_name)[source]¶
Called after a delay queue has been declared.
- before_delay_message(broker, message)[source]¶
Called before a message has been delayed in worker memory.
- before_process_message(broker, message)[source]¶
Called before a message is processed.
- Raises:
SkipMessage – If the current message should be skipped. When this is raised,
after_skip_message
is emitted instead ofafter_process_message
.
- after_process_message(broker, message, *, result=None, exception=None)[source]¶
Called after a message has been processed.
- after_skip_message(broker, message)[source]¶
Called instead of
after_process_message
after a message has been skipped.
- after_consumer_thread_boot(broker, thread)[source]¶
Called from a consumer thread after it starts but before it starts its run loop.
- before_consumer_thread_shutdown(broker, thread)[source]¶
Called before a consumer thread shuts down. This may be used to clean up thread-local resources (such as Django database connections).
- class dramatiq.middleware.AgeLimit(*, max_age=None)[source]¶
Middleware that drops messages that have been in the queue for too long.
- Parameters:
max_age (int) – The default message age limit in milliseconds. Defaults to
None
, meaning that messages can exist indefinitely.
- class dramatiq.middleware.AsyncIO[source]¶
This middleware manages the event loop thread for async actors.
- class dramatiq.middleware.Callbacks[source]¶
Middleware that lets you chain success and failure callbacks onto Actors.
- class dramatiq.middleware.CurrentMessage[source]¶
Middleware that exposes the current message via a thread-local variable.
Example
>>> import dramatiq >>> from dramatiq.middleware import CurrentMessage
>>> @dramatiq.actor ... def example(x): ... print(CurrentMessage.get_current_message()) ... >>> example.send(1)
- classmethod get_current_message() Message[Any] | None [source]¶
Get the message that triggered the current actor. Messages are thread local so this returns
None
when called outside of actor code.
- before_process_message(broker, message)[source]¶
Called before a message is processed.
- Raises:
SkipMessage – If the current message should be skipped. When this is raised,
after_skip_message
is emitted instead ofafter_process_message
.
- class dramatiq.middleware.Pipelines[source]¶
Middleware that lets you pipe actors together so that the output of one actor feeds into the input of another.
- class dramatiq.middleware.Prometheus[source]¶
A middleware that exports stats via Prometheus.
- class dramatiq.middleware.Retries(*, max_retries=20, min_backoff=None, max_backoff=None, retry_when=None)[source]¶
Middleware that automatically retries failed tasks with exponential backoff.
Disabling this middleware will cause messages that fail due to exceptions to be marked ‘done’ rather than ‘failed’. If you don’t want actors to retry automatically, it’s better to set their
max_retries
options to0
than to remove this middleware.If you need to intentionally retry an actor and you don’t want the exception to get logged, then consider raising the
Retry
exception from within your actor.Actors that have their
throws
option set to an exception class or a tuple of exception classes will not be retried if one of those exceptions is raised within them. For example:>>> @actor(throws=(RuntimeError,)) ... def example(): ... raise RuntimeError("never retried")
- Parameters:
max_retries (int) – The maximum number of times tasks can be retried.
min_backoff (int) – The minimum amount of backoff milliseconds to apply to retried tasks. Defaults to 15 seconds.
max_backoff (int) – The maximum amount of backoff milliseconds to apply to retried tasks. Defaults to 7 days.
retry_when (Callable[[int, Exception], bool]) – An optional predicate that can be used to programmatically determine whether a task should be retried or not. This takes precedence over max_retries when set.
- class dramatiq.middleware.ShutdownNotifications(notify_shutdown=False)[source]¶
Middleware that interrupts actors whose worker process has been signaled for termination. Currently, this is only available on CPython.
Note
This works by setting an async exception in the worker thread that runs the actor. This means that the exception will only get called the next time that thread acquires the GIL. Concretely, this means that this middleware can’t cancel system calls.
- Parameters:
notify_shutdown (bool) – When true, the actor will be interrupted if the worker process was terminated.
- class dramatiq.middleware.TimeLimit(*, time_limit=600000, interval=1000)[source]¶
Middleware that cancels actors that run for too long. Currently, this is only available on CPython.
Note
This works by setting an async exception in the worker thread that runs the actor. This means that the exception will only get called the next time that thread acquires the GIL. Concretely, this means that this middleware can’t cancel system calls.
- Parameters:
time_limit (float) – The maximum number of milliseconds actors may run for. Use float(“inf”) to avoid setting a timeout for the actor.
interval (int) – The interval (in milliseconds) with which to check for actors that have exceeded the limit. This does not take effect when using gevent because the timers are managed by gevent.
Errors¶
The class hierarchy for middleware exceptions:
BaseException
+-- Exception
| +-- dramatiq.middleware.MiddlewareError
| +-- dramatiq.middleware.SkipMessage
+-- dramatiq.middleware.Interrupt
+-- dramatiq.middleware.Shutdown
+-- dramatiq.middleware.TimeLimitExceeded
- class dramatiq.middleware.SkipMessage[source]¶
An exception that may be raised by Middleware inside the
before_process_message
hook in order to skip a message.
- class dramatiq.middleware.Interrupt[source]¶
Base class for exceptions used to asynchronously interrupt a thread’s execution. An actor may catch these exceptions in order to respond gracefully, such as performing any necessary cleanup.
This is not a subclass of
DramatiqError
to avoid it being caught unintentionally.
Results¶
Actor results can be stored and retrieved by leveraging result backends and the results middleware. Results and result backends are not enabled by default and you should avoid using them until you have a really good use case. Most of the time you can get by with actors simply updating data in your database instead of using results.
Middleware¶
- class dramatiq.results.Results(*, backend=None, store_results=False, result_ttl=None)[source]¶
Middleware that automatically stores actor results.
Example
>>> from dramatiq.results import Results >>> from dramatiq.results.backends import RedisBackend >>> backend = RedisBackend() >>> broker.add_middleware(Results(backend=backend))
>>> @dramatiq.actor(store_results=True) ... def add(x, y): ... return x + y
>>> message = add.send(1, 2) >>> message.get_result(backend=backend) 3
>>> @dramatiq.actor(store_results=True) ... def fail(): ... raise Exception("failed")
>>> message = fail.send() >>> message.get_result(backend=backend) Traceback (most recent call last): ... ResultFailure: actor raised Exception: failed
- Parameters:
backend (ResultBackend) – The result backend to use when storing results.
store_results (bool) – Whether or not actor results should be stored. Defaults to False and can be set on a per-actor basis.
result_ttl (int) – The maximum number of milliseconds results are allowed to exist in the backend. Defaults to 10 minutes and can be set on a per-actor basis.
Warning
If you have retries turned on for an actor that stores results, then the result of a message may be delayed until its retries run out!
- property actor_options¶
The set of options that may be configured on each actor.
Backends¶
- class dramatiq.results.ResultBackend(*, namespace: str = 'dramatiq-results', encoder: Encoder | None = None)[source]¶
ABC for result backends.
- Parameters:
namespace (str) – The logical namespace under which the data should be stored.
encoder (Encoder) – The encoder to use when storing and retrieving result data. Defaults to
JSONEncoder
.
- build_message_key(message) str [source]¶
Given a message, return its globally-unique key.
- Parameters:
message (Message) –
- Returns:
str
- get_result(message, *, block: bool = False, timeout: int | None = None) Any [source]¶
Get a result from the backend.
- Parameters:
- Raises:
ResultMissing – When block is False and the result isn’t set.
ResultTimeout – When waiting for a result times out.
- Returns:
The result.
- Return type:
- store_exception(message, exception: Exception, ttl: int) None [source]¶
Store an exception in the backend.
- unwrap_result(res)[source]¶
Unwrap the serialized result. Passes through to
unwrap_result()
by default, but can be overridden to complement changes to howstore_result()
orstore_exception()
serialize their results.
- class dramatiq.results.backends.MemcachedBackend(*, namespace='dramatiq-results', encoder=None, pool=None, pool_size=8, **parameters)[source]¶
A result backend for Memcached. This backend uses long polling to retrieve results.
- Parameters:
namespace (str) – A string with which to prefix result keys.
encoder (Encoder) – The encoder to use when storing and retrieving result data. Defaults to
JSONEncoder
.pool (ClientPool) – An optional pylibmc client pool to use. If this is passed, all other connection params are ignored.
pool_size (int) – The size of the connection pool to use.
**parameters – Connection parameters are passed directly to
pylibmc.Client
.
- class dramatiq.results.backends.RedisBackend(*, namespace='dramatiq-results', encoder=None, client=None, url=None, **parameters)[source]¶
A result backend for Redis_. This is the recommended result backend as waiting for a result is resource efficient.
- Parameters:
namespace (str) – A string with which to prefix result keys.
encoder (Encoder) – The encoder to use when storing and retrieving result data. Defaults to
JSONEncoder
.client (Redis) – An optional client. If this is passed, then all other parameters are ignored.
url (str) – An optional connection URL. If both a URL and connection parameters are provided, the URL is used.
**parameters – Connection parameters are passed directly to
redis.Redis
.
- class dramatiq.results.backends.StubBackend(*, namespace: str = 'dramatiq-results', encoder: Encoder | None = None)[source]¶
An in-memory result backend. For use in unit tests.
- Parameters:
namespace (str) – A string with which to prefix result keys.
encoder (Encoder) – The encoder to use when storing and retrieving result data. Defaults to
JSONEncoder
.
Rate Limiters¶
Rate limiters can be used to determine whether or not an operation can be run at the current time across many processes and machines by using a shared storage backend.
Backends¶
Rate limiter backends are used to store metadata about rate limits.
- class dramatiq.rate_limits.RateLimiterBackend[source]¶
ABC for rate limiter backends.
- decr(key, amount, minimum, ttl)[source]¶
Atomically decrement a key in the backend up to the given maximum.
- Parameters:
- Returns:
True if the key was successfully decremented.
- Return type:
- incr(key, amount, maximum, ttl)[source]¶
Atomically increment a key in the backend up to the given maximum.
- Parameters:
- Returns:
True if the key was successfully incremented.
- Return type:
- incr_and_sum(key, keys, amount, maximum, ttl)[source]¶
Atomically increment a key unless the sum of keys is greater than the given maximum.
- Parameters:
- Returns:
True if the key was successfully incremented.
- Return type:
- class dramatiq.rate_limits.backends.MemcachedBackend(*, pool=None, pool_size=8, **parameters)[source]¶
A rate limiter backend for Memcached.
Examples
>>> from dramatiq.rate_limits.backends import MemcachedBackend >>> backend = MemcachedBackend(servers=["127.0.0.1"], binary=True)
- Parameters:
pool (ClientPool) – An optional pylibmc client pool to use. If this is passed, all other connection params are ignored.
pool_size (int) – The size of the connection pool to use.
**parameters – Connection parameters are passed directly to
pylibmc.Client
.
- class dramatiq.rate_limits.backends.RedisBackend(*, client=None, url=None, **parameters)[source]¶
A rate limiter backend for Redis_.
- Parameters:
client (Redis) – An optional client. If this is passed, then all other parameters are ignored.
url (str) – An optional connection URL. If both a URL and connection parameters are provided, the URL is used.
**parameters – Connection parameters are passed directly to
redis.Redis
.
Limiters¶
- class dramatiq.rate_limits.RateLimiter(backend, key)[source]¶
ABC for rate limiters.
Examples
>>> from dramatiq.rate_limits.backends import RedisBackend
>>> backend = RedisBackend() >>> limiter = ConcurrentRateLimiter(backend, "distributed-mutex", limit=1)
>>> with limiter.acquire(raise_on_failure=False) as acquired: ... if not acquired: ... print("Mutex not acquired.") ... return ... ... print("Mutex acquired.")
- Parameters:
backend (RateLimiterBackend) – The rate limiting backend to use.
key (str) – The key to rate limit on.
- class dramatiq.rate_limits.BucketRateLimiter(backend, key, *, limit=1, bucket=1000)[source]¶
A rate limiter that ensures that only up to limit operations may happen over some time interval.
Examples
Up to 10 operations every second:
>>> BucketRateLimiter(backend, "some-key", limit=10, bucket=1_000)
Up to 1 operation every minute:
>>> BucketRateLimiter(backend, "some-key", limit=1, bucket=60_000)
Warning
Bucket rate limits are cheap to maintain but are susceptible to burst “attacks”. Given a bucket rate limit of 100 per minute, an attacker could make a burst of 100 calls in the last second of a minute and then another 100 calls in the first second of the subsequent minute.
For a rate limiter that doesn’t have this problem (but is more expensive to maintain), see
WindowRateLimiter
.- Parameters:
backend (RateLimiterBackend) – The backend to use.
key (str) – The key to rate limit on.
limit (int) – The maximum number of operations per bucket per key.
bucket (int) – The bucket interval in milliseconds.
- class dramatiq.rate_limits.ConcurrentRateLimiter(backend, key, *, limit=1, ttl=900000)[source]¶
A rate limiter that ensures that only limit concurrent operations may happen at the same time.
Note
You can use a concurrent rate limiter of size 1 to get a distributed mutex.
- Parameters:
backend (RateLimiterBackend) – The backend to use.
key (str) – The key to rate limit on.
limit (int) – The maximum number of concurrent operations per key.
ttl (int) – The time in milliseconds that keys may live for.
- class dramatiq.rate_limits.WindowRateLimiter(backend, key, *, limit=1, window=1)[source]¶
A rate limiter that ensures that only limit operations may happen over some sliding window.
Note
Windows are in seconds rather that milliseconds. This is different from most durations and intervals used in Dramatiq, because keeping metadata at the millisecond level is far too expensive for most use cases.
- Parameters:
backend (RateLimiterBackend) – The backend to use.
key (str) – The key to rate limit on.
limit (int) – The maximum number of operations per window per key.
window (int) – The window size in seconds. The wider the window, the more expensive it is to maintain.
Barriers¶
- class dramatiq.rate_limits.Barrier(backend, key, *, ttl=900000)[source]¶
A distributed barrier.
Examples
>>> from dramatiq.rate_limits import Barrier >>> from dramatiq.rate_limits.backends import RedisBackend
>>> backend = RedisBackend() >>> barrier = Barrier(backend, "some-barrier", ttl=30_000)
>>> created = barrier.create(parties=3) >>> barrier.wait(block=False) False >>> barrier.wait(block=False) False >>> barrier.wait(block=False) True
- Parameters:
- wait(*, block=True, timeout=None)[source]¶
Signal that a party has reached the barrier.
Warning
Barrier blocking is currently only supported by the stub and Redis backends.
Warning
Re-using keys between blocking calls may lead to undefined behaviour. Make sure your barrier keys are always unique (use a UUID).
Workers¶
- class dramatiq.Worker(broker, *, queues=None, worker_timeout=1000, worker_threads=8)[source]¶
Workers consume messages off of all declared queues and distribute those messages to individual worker threads for processing. Workers don’t block the current thread so it’s up to the caller to keep it alive.
Don’t run more than one Worker per process.
- Parameters:
broker (Broker) –
queues (Set[str]) – An optional subset of queues to listen on. By default, if this is not provided, the worker will listen on all declared queues.
worker_timeout (int) – The number of milliseconds workers should wake up after if the queue is idle.
worker_threads (int) – The number of worker threads to spawn.
Errors¶
- class dramatiq.ActorNotFound(message)[source]¶
Raised when a message is sent to an actor that hasn’t been declared.
- class dramatiq.QueueNotFound(message)[source]¶
Raised when a message is sent to an queue that hasn’t been declared.
- class dramatiq.ConnectionClosed(message)[source]¶
Raised when a broker connection is suddenly closed.
- class dramatiq.ConnectionFailed(message)[source]¶
Raised when a broker connection could not be opened.
- class dramatiq.Retry(message='', delay=None)[source]¶
Actors may raise this error when they should be retried. This behaves just like any other exception from the perspective of the
Retries
middleware, the only difference is it doesn’t get logged as an error.If the
delay
argument is provided, then the message will be retried after at least that amount of time (in milliseconds).