Advanced Topics

Brokers

Multi-tenancy

With Dramatiq you can run multiple logical apps on the same broker. The way you do this is different for each broker, but fairly simple in each case.

RabbitMQ

RabbitMQ has the concept of virtual hosts built into it. They provide logical grouping and separation of resources. You can create virtual hosts using the rabbitmqctl command:

$ rabbitmqctl add_vhost app1
$ rabbitmqctl set_permissions -p app1 my_user ".*" ".*" ".*"

You can then pass that vhost to RabbitmqBroker when you instantiate it.

Redis

The RedisBroker takes a namespace parameter that you can use to logically split queues across multiple apps.

Messages

Message Persistence

Dramatiq has at-least-once message delivery semantics. Messages sent to Dramatiq brokers are persisted to disk and survive across broker reboots. Exactly how often messages are written to disk depends on your broker.

Messages that have been pulled by workers but not processed are returned to the broker on shutdown and any messages that are in flight while a worker is terminated (eg. via SIGKILL) are going to be redelivered later. Messages are only ever acknowledged to the broker when they have finished being processed.

Enqueueing Messages from Other Languages

You can enqueue Dramatiq messages using any language that has bindings to one of its brokers. All you have to do is push a JSON-encoded dictionary containing the following fields to your queue:

{
  "queue_name": "default",     // The name of the queue the message is being pushed on
  "actor_name": "add",         // The name of the actor that should handle this message
  "args": [1, 2],              // A list of positional arguments that are passed to the actor
  "kwargs": {},                // A dictionary of keyword arguments that are passed to the actor
  "options": {},               // Arbitrary options that are used by middleware. Leave this empty
  "message_id": "unique-id",   // A UUID4 value representing the message's unique id in the system
  "message_timestamp": 0,      // The UNIX timestamp in milliseconds representing when the message was first enqueued
}

Using RabbitMQ

Assuming you want to enqueue a message on a queue named default, publish a persistent message to that queue in RabbitMQ.

Using Redis

Assuming you want to enqueue a message on a queue named default, run:

> HSET default.msgs $YOUR_MESSAGE_ID $YOUR_MESSAGE_PAYLOAD
> RPUSH default $YOUR_MESSAGE_ID

Workers

Worker Exit Codes

Dramatiq uses process exit codes to denote several scenarios:

Code Description
0 Returned when the process exits gracefully.
1 Returned when the process is killed.
2 Returned when a module cannot be imported or when a command line argument is invalid.
3 Returned when a broker connection cannot be established during worker startup.

Controlling Workers

The main Dramatiq process responds to several signals:

$ kill -TERM [master-process-pid]

INT and TERM

Sending an INT or TERM signal to the main process triggers graceful shutdown. Consumer threads will stop receiving new work and worker threads will finish processing the work they have in flight before shutting down. Any tasks still in worker memory at this point are re-queued on the broker.

If you send a second INT or TERM signal then the worker processes will be killed immediately.

HUP

Sending HUP to the main process triggers a graceful shutdown followed by a reload of the workers. This is useful if you want to reload code without completely restarting the main process.

Using gevent

Dramatiq comes with a CLI utility called dramatiq-gevent that can run workers under gevent. The following invocation would run 8 worker processes with 250 greenlets per process for a total of 2k lightweight worker threads:

$ dramatiq-gevent my_app -p 8 -t 250

If your tasks spend most of their time doing network IO and don’t depend on C extensions to execute those network calls then using gevent could provide a significant performance improvement.

I suggest at least experimenting with it to see if it fits your use case.

Prometheus Metrics

Prometheus metrics are automatically exported by workers whenever you run them using the command line utility (assuming you’re using the Prometheus middleware). By default, the exposition server listens on port 9191 so you can tell Prometheus to scrape that or you can specify what host and port it should listen on by setting the dramatiq_prom_host and dramatiq_prom_port environment variables.

The following metrics are exported:

dramatiq_messages_total
A counter for the total number of messages processed.
dramatiq_message_errors_total
A counter for the total number of errored messages.
dramatiq_message_retries_total
A counter for the total number of retried messages.
dramatiq_message_rejects_total
A counter for the total number of dead-lettered messages.
dramatiq_messages_inprogress
A gauge for the number of messages currently being processed.
dramatiq_delayed_messages_inprogress
A gauge for the number of delayed messages currently in memory.
dramatiq_message_duration_milliseconds
A histogram for the time spent processing messages.

All metrics define labels for queue_name and actor_name.

Grafana

You can find a Grafana dashboard that displays these metrics here.