Source code for dramatiq.results.backend

# This file is a part of Dramatiq.
#
# Copyright (C) 2017,2018,2019,2020 CLEARTYPE SRL <[email protected]>
#
# Dramatiq is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Dramatiq is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations

import hashlib
import time
import typing

from ..common import compute_backoff, q_name
from ..encoder import Encoder
from .errors import ResultMissing, ResultTimeout
from .result import unwrap_result, wrap_exception, wrap_result

#: The default timeout for blocking get operations in milliseconds.
DEFAULT_TIMEOUT = 10000

#: The minimum amount of time in ms to wait between polls.
BACKOFF_FACTOR = 100

#: Canary value that is returned when a result hasn't been set yet.
Missing = type("Missing", (object,), {})()

#: A type alias representing backend results.
Result = typing.Any

#: A union representing a Result that may or may not be there.
MResult = typing.Union[typing.Type[object], Result]


[docs] class ResultBackend: """ABC for result backends. Parameters: namespace(str): The logical namespace under which the data should be stored. encoder(Encoder): The encoder to use when storing and retrieving result data. Defaults to :class:`.JSONEncoder`. use_namespace_prefix_keys(bool): When True, the keys used to store results use the format ``<namespace>:<queue>:<actor>:<message_id>`` so that the keys are human-readable and can be scanned or expired by namespace in the backend. When False (the default) the keys are MD5 hashes of the format described above, so the namespace is not visible in the stored keys. """ def __init__( self, *, namespace: str = "dramatiq-results", encoder: typing.Optional[Encoder] = None, use_namespace_prefix_keys: bool = False, ): from ..message import get_encoder self.namespace = namespace self.encoder = encoder or get_encoder() self.use_namespace_prefix_keys = use_namespace_prefix_keys
[docs] def unwrap_result(self, res): """Unwrap the serialized result. Passes through to :func:`unwrap_result` by default, but can be overridden to complement changes to how :meth:`store_result` or :meth:`store_exception` serialize their results. Parameters: res(object): The result data, typically a dict. Returns: object: The result. """ return unwrap_result(res)
[docs] def get_result(self, message, *, block: bool = False, timeout: typing.Optional[int] = None) -> Result: """Get a result from the backend. Parameters: message(Message) block(bool): Whether or not to block until a result is set. timeout(int): The maximum amount of time, in ms, to wait for a result when block is True. Defaults to 10 seconds. Raises: ResultMissing: When block is False and the result isn't set. ResultTimeout: When waiting for a result times out. Returns: object: The result. """ if timeout is None: timeout = DEFAULT_TIMEOUT end_time = time.monotonic() + timeout / 1000 message_key = self.build_message_key(message) attempts = 0 while True: result = self._get(message_key) if result is Missing and block: attempts, delay = compute_backoff(attempts, factor=BACKOFF_FACTOR) delay_seconds = delay / 1000 if time.monotonic() + delay_seconds > end_time: raise ResultTimeout(message) time.sleep(delay_seconds) continue elif result is Missing: raise ResultMissing(message) else: return self.unwrap_result(result)
[docs] def store_result(self, message, result: Result, ttl: int) -> None: """Store a result in the backend. Parameters: message(Message) result(object): Must be serializable. ttl(int): The maximum amount of time the result may be stored in the backend for. """ message_key = self.build_message_key(message) return self._store(message_key, wrap_result(result), ttl)
[docs] def store_exception(self, message, exception: Exception, ttl: int) -> None: """Store an exception in the backend. Parameters: message(Message) exception(Exception) ttl(int): The maximum amount of time the exception may be stored in the backend for. """ message_key = self.build_message_key(message) return self._store(message_key, wrap_exception(exception), ttl)
[docs] def build_message_key(self, message) -> str: """Given a message, return its globally-unique key. Parameters: message(Message) Returns: str """ message_key = "%(namespace)s:%(queue_name)s:%(actor_name)s:%(message_id)s" % { "namespace": self.namespace, "queue_name": q_name(message.queue_name), "actor_name": message.actor_name, "message_id": message.message_id, } if self.use_namespace_prefix_keys: return message_key else: return hashlib.md5(message_key.encode("utf-8")).hexdigest()
def _get(self, message_key: str) -> MResult: # pragma: no cover """Get a result from the backend. Subclasses may implement this method if they want to use the default, polling, implementation of get_result. """ raise NotImplementedError( "%(classname)r does not implement _get()" % { "classname": type(self).__name__, } ) def _store(self, message_key: str, result: Result, ttl: int) -> None: # pragma: no cover """Store a result in the backend. Subclasses may implement this method if they want to use the default implementation of set_result. """ raise NotImplementedError( "%(classname)r does not implement _store()" % { "classname": type(self).__name__, } )