Warning: This document is for the development version of Dramatiq.

Source code for dramatiq.middleware.middleware

# This file is a part of Dramatiq.
#
# Copyright (C) 2017,2018 CLEARTYPE SRL <bogdan@cleartype.io>
#
# Dramatiq is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Dramatiq is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


[docs]class MiddlewareError(Exception): """Base class for middleware errors. """
[docs]class SkipMessage(MiddlewareError): """An exception that may be raised by Middleware inside the ``before_process_message`` hook in order to skip a message. """
[docs]class Middleware: """Base class for broker middleware. The default implementations for all hooks are no-ops and subclasses may implement whatever subset of hooks they like. """ @property def actor_options(self): """The set of options that may be configured on each actor. """ return set()
[docs] def before_ack(self, broker, message): """Called before a message is acknowledged. """
[docs] def after_ack(self, broker, message): """Called after a message has been acknowledged. """
[docs] def before_nack(self, broker, message): """Called before a message is rejected. """
[docs] def after_nack(self, broker, message): """Called after a message has been rejected. """
[docs] def before_declare_actor(self, broker, actor): """Called before an actor is declared. """
[docs] def after_declare_actor(self, broker, actor): """Called after an actor has been declared. """
[docs] def before_declare_queue(self, broker, queue_name): """Called before a queue is declared. """
[docs] def after_declare_queue(self, broker, queue_name): """Called after a queue has been declared. """
[docs] def after_declare_delay_queue(self, broker, queue_name): """Called after a delay queue has been declared. """
[docs] def before_enqueue(self, broker, message, delay): """Called before a message is enqueued. """
[docs] def after_enqueue(self, broker, message, delay): """Called after a message has been enqueued. """
[docs] def before_delay_message(self, broker, message): """Called before a message has been delayed in worker memory. """
[docs] def before_process_message(self, broker, message): """Called before a message is processed. Raises: SkipMessage: If the current message should be skipped. When this is raised, ``after_skip_message`` is emitted instead of ``after_process_message``. """
[docs] def after_process_message(self, broker, message, *, result=None, exception=None): """Called after a message has been processed. """
[docs] def after_skip_message(self, broker, message): """Called instead of ``after_process_message`` after a message has been skippped. """
[docs] def after_process_boot(self, broker): """Called immediately after subprocess start up. """
[docs] def before_worker_boot(self, broker, worker): """Called before the worker process starts up. """
[docs] def after_worker_boot(self, broker, worker): """Called after the worker process has started up. """
[docs] def before_worker_shutdown(self, broker, worker): """Called before the worker process shuts down. """
[docs] def after_worker_shutdown(self, broker, worker): """Called after the worker process shuts down. """
[docs] def before_consumer_thread_shutdown(self, broker, thread): """Called before a consumer thread shuts down. This may be used to clean up thread-local resources (such as Django database connections). There is no ``after_consumer_thread_boot``. """
[docs] def before_worker_thread_shutdown(self, broker, thread): """Called before a worker thread shuts down. This may be used to clean up thread-local resources (such as Django database connections). There is no ``after_worker_thread_boot``. """