Source code for dramatiq.broker

# This file is a part of Dramatiq.
#
# Copyright (C) 2017,2018,2019,2020 CLEARTYPE SRL <[email protected]>
#
# Dramatiq is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Dramatiq is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Optional, cast

from .errors import ActorNotFound
from .logging import get_logger
from .middleware import MiddlewareError, default_middleware
from .results import Results

#: The global broker instance.
global_broker: Optional["Broker"] = None


[docs] def get_broker() -> "Broker": """Get the global broker instance. If no global broker is set, a RabbitMQ broker will be returned. If the RabbitMQ dependencies are not installed, a Redis broker will be returned. Returns: Broker: The default Broker. """ global global_broker if global_broker is None: # RabbitMQ will be tried first, but its dependencies might not # be installed if the user only installed the [redis] extras. try: from .brokers.rabbitmq import RabbitmqBroker set_broker(RabbitmqBroker( host="127.0.0.1", port=5672, heartbeat=5, connection_attempts=5, blocked_connection_timeout=30, )) except ImportError: # Fall back to the Redis broker. from .brokers.redis import RedisBroker set_broker(RedisBroker()) global_broker = cast("Broker", global_broker) return global_broker
[docs] def set_broker(broker: "Broker"): """Configure the global broker instance. Parameters: broker(Broker): The broker instance to use by default. """ global global_broker global_broker = broker
[docs] class Broker: """Base class for broker implementations. Parameters: middleware(list[Middleware]): The set of middleware that apply to this broker. If you supply this parameter, you are expected to declare *all* middleware. Most of the time, you'll want to use :meth:`.add_middleware` instead. Attributes: actor_options(set[str]): The names of all the options actors may overwrite when they are declared. """ def __init__(self, middleware=None): self.logger = get_logger(__name__, type(self)) self.actors = {} self.queues = {} self.delay_queues = set() self.actor_options = set() self.middleware = [] if middleware is None: middleware = [m() for m in default_middleware] for m in middleware: self.add_middleware(m) def emit_before(self, signal, *args, **kwargs): signal = "before_" + signal for middleware in self.middleware: try: getattr(middleware, signal)(self, *args, **kwargs) except MiddlewareError: raise except Exception: self.logger.critical("Unexpected failure in %s of %r.", signal, middleware, exc_info=True) def emit_after(self, signal, *args, **kwargs): signal = "after_" + signal for middleware in reversed(self.middleware): try: getattr(middleware, signal)(self, *args, **kwargs) except Exception: self.logger.critical("Unexpected failure in %s of %r.", signal, middleware, exc_info=True)
[docs] def add_middleware(self, middleware, *, before=None, after=None): """Add a middleware object to this broker. The middleware is appended to the end of the middleware list by default. You can specify another middleware (by class) as a reference point for where the new middleware should be added. Parameters: middleware(Middleware): The middleware. before(type): Add this middleware before a specific one. after(type): Add this middleware after a specific one. Raises: ValueError: When either ``before`` or ``after`` refer to a middleware that hasn't been registered yet. """ assert not (before and after), \ "provide either 'before' or 'after', but not both" if before or after: for i, m in enumerate(self.middleware): # noqa if isinstance(m, before or after): break else: raise ValueError("Middleware %r not found" % (before or after)) if before: self.middleware.insert(i, middleware) else: self.middleware.insert(i + 1, middleware) else: self.middleware.append(middleware) self.actor_options |= middleware.actor_options for actor_name in self.get_declared_actors(): middleware.after_declare_actor(self, actor_name) for queue_name in self.get_declared_queues(): middleware.after_declare_queue(self, queue_name) for queue_name in self.get_declared_delay_queues(): middleware.after_declare_delay_queue(self, queue_name)
[docs] def close(self): """Close this broker and perform any necessary cleanup actions. """
[docs] def consume(self, queue_name, prefetch=1, timeout=30000): # pragma: no cover """Get an iterator that consumes messages off of the queue. Raises: QueueNotFound: If the given queue was never declared. Parameters: queue_name(str): The name of the queue to consume messages off of. prefetch(int): The number of messages to prefetch per consumer. timeout(int): The amount of time in milliseconds to idle for. Returns: Consumer: A message iterator. """ raise NotImplementedError
[docs] def declare_actor(self, actor): # pragma: no cover """Declare a new actor on this broker. Declaring an Actor twice replaces the first actor with the second by name. Parameters: actor(Actor): The actor being declared. """ self.emit_before("declare_actor", actor) self.declare_queue(actor.queue_name) self.actors[actor.actor_name] = actor self.emit_after("declare_actor", actor)
[docs] def declare_queue(self, queue_name): # pragma: no cover """Declare a queue on this broker. This method must be idempotent. Parameters: queue_name(str): The name of the queue being declared. """ raise NotImplementedError
[docs] def enqueue(self, message, *, delay=None): # pragma: no cover """Enqueue a message on this broker. Parameters: message(Message): The message to enqueue. delay(int): The number of milliseconds to delay the message for. Returns: Message: Either the original message or a copy of it. """ raise NotImplementedError
[docs] def get_actor(self, actor_name): # pragma: no cover """Look up an actor by its name. Parameters: actor_name(str): The name to look up. Raises: ActorNotFound: If the actor was never declared. Returns: Actor: The actor. """ try: return self.actors[actor_name] except KeyError: raise ActorNotFound(actor_name) from None
[docs] def get_declared_actors(self): # pragma: no cover """Get all declared actors. Returns: set[str]: The names of all the actors declared so far on this Broker. """ return set(self.actors.keys())
[docs] def get_declared_queues(self): # pragma: no cover """Get all declared queues. Returns: set[str]: The names of all the queues declared so far on this Broker. """ return set(self.queues.keys())
[docs] def get_declared_delay_queues(self): # pragma: no cover """Get all declared delay queues. Returns: set[str]: The names of all the delay queues declared so far on this Broker. """ return self.delay_queues.copy()
[docs] def get_results_backend(self): """Get the backend of the Results middleware. Raises: RuntimeError: If the broker doesn't have a results backend. Returns: ResultBackend: The backend. """ for middleware in self.middleware: if isinstance(middleware, Results): return middleware.backend else: raise RuntimeError("The broker doesn't have a results backend.")
[docs] def flush(self, queue_name): # pragma: no cover """Drop all the messages from a queue. Parameters: queue_name(str): The name of the queue to flush. """ raise NotImplementedError()
[docs] def flush_all(self): # pragma: no cover """Drop all messages from all declared queues. """ raise NotImplementedError()
[docs] def join(self, queue_name, *, timeout=None): # pragma: no cover """Wait for all the messages on the given queue to be processed. This method is only meant to be used in tests to wait for all the messages in a queue to be processed. Subclasses that implement this function may add parameters. Parameters: queue_name(str): The queue to wait on. timeout(Optional[int]): The max amount of time, in milliseconds, to wait on this queue. """ raise NotImplementedError()
[docs] class Consumer: """Consumers iterate over messages on a queue. Consumers and their MessageProxies are *not* thread-safe. """
[docs] def __iter__(self): # pragma: no cover """Returns this instance as a Message iterator. """ return self
[docs] def ack(self, message): # pragma: no cover """Acknowledge that a message has been processed, removing it from the broker. Parameters: message(MessageProxy): The message to acknowledge. """ raise NotImplementedError
[docs] def nack(self, message): # pragma: no cover """Move a message to the dead-letter queue. Parameters: message(MessageProxy): The message to reject. """ raise NotImplementedError
def requeue(self, messages): # pragma: no cover """Move unacked messages back to their queues. This is called by consumer threads when they fail or are shut down. The default implementation does nothing. Parameters: messages(list[MessageProxy]): The messages to requeue. """
[docs] def __next__(self): # pragma: no cover """Retrieve the next message off of the queue. This method blocks until a message becomes available. Returns: MessageProxy: A transparent proxy around a Message that can be used to acknowledge or reject it once it's done being processed. """ raise NotImplementedError
[docs] def close(self): """Close this consumer and perform any necessary cleanup actions. """
[docs] class MessageProxy: """Base class for messages returned by :meth:`Broker.consume`. """ def __init__(self, message): self.failed = False self._message = message self._exception = None
[docs] def stuff_exception(self, exception): """Stuff an exception into this message. """ self._exception = exception
[docs] def clear_exception(self): """Remove the exception from this message. """ del self._exception
[docs] def fail(self): """Mark this message for rejection. """ self.failed = True
def __getattr__(self, name): return getattr(self._message, name) def __str__(self): return str(self._message) def __lt__(self, other): # This can get called if two messages have the same priority # in a queue. If that's the case, we don't care which runs # first. return True def __eq__(self, other): if isinstance(other, MessageProxy): return self._message == other._message return self._message == other