Source code for dramatiq.results.backend

# This file is a part of Dramatiq.
#
# Copyright (C) 2017,2018,2019,2020 CLEARTYPE SRL <[email protected]>
#
# Dramatiq is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Dramatiq is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import hashlib
import time
import typing

from ..common import compute_backoff, q_name
from ..encoder import Encoder
from .errors import ResultMissing, ResultTimeout
from .result import unwrap_result, wrap_exception, wrap_result

#: The default timeout for blocking get operations in milliseconds.
DEFAULT_TIMEOUT = 10000

#: The minimum amount of time in ms to wait between polls.
BACKOFF_FACTOR = 100

#: Canary value that is returned when a result hasn't been set yet.
Missing = type("Missing", (object,), {})()

#: A type alias representing backend results.
Result = typing.Any

#: A union representing a Result that may or may not be there.
MResult = typing.Union[typing.Type[object], Result]


[docs] class ResultBackend: """ABC for result backends. Parameters: namespace(str): The logical namespace under which the data should be stored. encoder(Encoder): The encoder to use when storing and retrieving result data. Defaults to :class:`.JSONEncoder`. """ def __init__(self, *, namespace: str = "dramatiq-results", encoder: typing.Optional[Encoder] = None): from ..message import get_encoder self.namespace = namespace self.encoder = encoder or get_encoder()
[docs] def unwrap_result(self, res): """Unwrap the serialized result. Passes through to :func:`unwrap_result` by default, but can be overridden to complement changes to how :meth:`store_result` or :meth:`store_exception` serialize their results. Parameters: res(object): The result data, typically a dict. Returns: object: The result. """ return unwrap_result(res)
[docs] def get_result(self, message, *, block: bool = False, timeout: typing.Optional[int] = None) -> Result: """Get a result from the backend. Parameters: message(Message) block(bool): Whether or not to block until a result is set. timeout(int): The maximum amount of time, in ms, to wait for a result when block is True. Defaults to 10 seconds. Raises: ResultMissing: When block is False and the result isn't set. ResultTimeout: When waiting for a result times out. Returns: object: The result. """ if timeout is None: timeout = DEFAULT_TIMEOUT end_time = time.monotonic() + timeout / 1000 message_key = self.build_message_key(message) attempts = 0 while True: result = self._get(message_key) if result is Missing and block: attempts, delay = compute_backoff(attempts, factor=BACKOFF_FACTOR) delay /= 1000 if time.monotonic() + delay > end_time: raise ResultTimeout(message) time.sleep(delay) continue elif result is Missing: raise ResultMissing(message) else: return self.unwrap_result(result)
[docs] def store_result(self, message, result: Result, ttl: int) -> None: """Store a result in the backend. Parameters: message(Message) result(object): Must be serializable. ttl(int): The maximum amount of time the result may be stored in the backend for. """ message_key = self.build_message_key(message) return self._store(message_key, wrap_result(result), ttl)
[docs] def store_exception(self, message, exception: Exception, ttl: int) -> None: """Store an exception in the backend. Parameters: message(Message) exception(Exception) ttl(int): The maximum amount of time the exception may be stored in the backend for. """ message_key = self.build_message_key(message) return self._store(message_key, wrap_exception(exception), ttl)
[docs] def build_message_key(self, message) -> str: """Given a message, return its globally-unique key. Parameters: message(Message) Returns: str """ message_key = "%(namespace)s:%(queue_name)s:%(actor_name)s:%(message_id)s" % { "namespace": self.namespace, "queue_name": q_name(message.queue_name), "actor_name": message.actor_name, "message_id": message.message_id, } return hashlib.md5(message_key.encode("utf-8")).hexdigest()
def _get(self, message_key: str) -> MResult: # pragma: no cover """Get a result from the backend. Subclasses may implement this method if they want to use the default, polling, implementation of get_result. """ raise NotImplementedError("%(classname)r does not implement _get()" % { "classname": type(self).__name__, }) def _store(self, message_key: str, result: Result, ttl: int) -> None: # pragma: no cover """Store a result in the backend. Subclasses may implement this method if they want to use the default implementation of set_result. """ raise NotImplementedError("%(classname)r does not implement _store()" % { "classname": type(self).__name__, })