# This file is a part of Dramatiq.
# Copyright (C) 2017,2018 CLEARTYPE SRL <[email protected]>
# Dramatiq is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
# Dramatiq is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <>.

import dataclasses
import time
import uuid
from typing import Any, Dict, Generic, Optional, Tuple, TypeVar

from .broker import get_broker
from .composition import pipeline
from .encoder import Encoder, JSONEncoder
from .errors import DecodeError
from .results import ResultBackend, Results

#: The global encoder instance.
global_encoder: Encoder = JSONEncoder()

R = TypeVar("R")

[docs]def get_encoder() -> Encoder: """Get the global encoder object. Returns: Encoder """ global global_encoder return global_encoder
[docs]def set_encoder(encoder: Encoder) -> None: """Set the global encoder object. Parameters: encoder(Encoder): The encoder instance to use when serializing messages. """ global global_encoder global_encoder = encoder
def generate_unique_id() -> str: """Generate a globally-unique message id. """ return str(uuid.uuid4())
[docs]@dataclasses.dataclass(frozen=True) class Message(Generic[R]): """Encapsulates metadata about messages being sent to individual actors. Parameters: queue_name(str): The name of the queue the message belongs to. actor_name(str): The name of the actor that will receive the message. args(tuple): Positional arguments that are passed to the actor. kwargs(dict): Keyword arguments that are passed to the actor. options(dict): Arbitrary options passed to the broker and middleware. message_id(str): A globally-unique id assigned to the actor. message_timestamp(int): The UNIX timestamp in milliseconds representing when the message was first enqueued. """ queue_name: str actor_name: str args: tuple kwargs: Dict[str, Any] options: Dict[str, Any] message_id: str = dataclasses.field(default_factory=generate_unique_id) message_timestamp: int = dataclasses.field(default_factory=lambda: int(time.time() * 1000)) def __post_init__(self): # For backwards-compatibility, enforce that `args' is a tuple. if type(self.args) is not tuple: # The class is marked frozen, so we have to use the # primitive setattr here. Direct assignment would fail. object.__setattr__(self, "args", tuple(self.args)) def __or__(self, other) -> pipeline: """Combine this message into a pipeline with "other". """ return pipeline([self, other])
[docs] def asdict(self) -> Dict[str, Any]: """Convert this message to a dictionary. """ # For backward compatibility, we can't use `dataclasses.asdict` # because it creates a copy of all values, including `options`. result = {} for field in dataclasses.fields(self): result[] = getattr(self, return result
[docs] @classmethod def decode(cls, data: bytes) -> "Message": """Convert a bytestring to a message. Raises: DecodeError: When the decoder raises an exception while decoding `data`. """ try: fields = global_encoder.decode(data) fields["args"] = tuple(fields["args"]) return cls(**fields) except Exception as e: raise DecodeError("Failed to decode message.", data, e) from e
[docs] def encode(self) -> bytes: """Convert this message to a bytestring. """ return global_encoder.encode(self.asdict())
[docs] def copy(self, **attributes) -> "Message": """Create a copy of this message. """ new_options = attributes.pop("options", {}) return dataclasses.replace(self, **attributes, options={**self.options, **new_options})
[docs] def get_result( self, *, backend: Optional[ResultBackend] = None, block: bool = False, timeout: Optional[int] = None, ) -> R: """Get the result associated with this message from a result backend. Warning: If you use multiple result backends or brokers you should always pass the backend parameter. This method is only able to infer the result backend off of the default broker. Parameters: backend(ResultBackend): The result backend to use to get the result. If omitted, this method will try to find and use the result backend on the default broker instance. block(bool): Whether or not to block while waiting for a result. timeout(int): The maximum amount of time, in ms, to block while waiting for a result. Raises: RuntimeError: If there is no result backend on the default broker. ResultMissing: When block is False and the result isn't set. ResultTimeout: When waiting for a result times out. Returns: object: The result. """ if backend is None: broker = get_broker() for middleware in broker.middleware: if isinstance(middleware, Results): backend = middleware.backend break else: raise RuntimeError("The default broker doesn't have a results backend.") return backend.get_result(self, block=block, timeout=timeout)
def __str__(self) -> str: params = ", ".join(repr(arg) for arg in self.args) if self.kwargs: params += ", " if params else "" params += ", ".join("%s=%r" % (name, value) for name, value in self.kwargs.items()) return "%s(%s)" % (self.actor_name, params) def __lt__(self, other: "Message") -> bool: return dataclasses.astuple(self) < dataclasses.astuple(other) # Backwards-compatibility with namedtuple. _asdict = asdict @property def _field_defaults(self) -> Dict[str, Any]: return { f.default for f in dataclasses.fields(self) if f.default is not dataclasses.MISSING} @property def _fields(self) -> Tuple[str, ...]: return tuple( for f in dataclasses.fields(self)) def _replace(self, **changes) -> "Message[R]": return dataclasses.replace(self, **changes)