# This file is a part of Dramatiq.
#
# Copyright (C) 2017,2018,2019,2020 CLEARTYPE SRL <[email protected]>
#
# Dramatiq is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Dramatiq is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations
import hashlib
import time
import typing
from ..common import compute_backoff, q_name
from ..encoder import Encoder
from .errors import ResultMissing, ResultTimeout
from .result import unwrap_result, wrap_exception, wrap_result
#: The default timeout for blocking get operations in milliseconds.
DEFAULT_TIMEOUT = 10000
#: The minimum amount of time in ms to wait between polls.
BACKOFF_FACTOR = 100
#: Canary value that is returned when a result hasn't been set yet.
Missing = type("Missing", (object,), {})()
#: A type alias representing backend results.
Result = typing.Any
#: A union representing a Result that may or may not be there.
MResult = typing.Union[typing.Type[object], Result]
[docs]
class ResultBackend:
"""ABC for result backends.
Parameters:
namespace(str): The logical namespace under which the data
should be stored.
encoder(Encoder): The encoder to use when storing and retrieving
result data. Defaults to :class:`.JSONEncoder`.
use_namespace_prefix_keys(bool): When True, the keys used to store results
use the format ``<namespace>:<queue>:<actor>:<message_id>`` so that the
keys are human-readable and can be scanned or expired by namespace in the backend.
When False (the default) the keys are MD5 hashes of the format described above,
so the namespace is not visible in the stored keys.
"""
def __init__(
self,
*,
namespace: str = "dramatiq-results",
encoder: typing.Optional[Encoder] = None,
use_namespace_prefix_keys: bool = False,
):
from ..message import get_encoder
self.namespace = namespace
self.encoder = encoder or get_encoder()
self.use_namespace_prefix_keys = use_namespace_prefix_keys
[docs]
def unwrap_result(self, res):
"""Unwrap the serialized result. Passes through to
:func:`unwrap_result` by default, but can be overridden to
complement changes to how :meth:`store_result` or
:meth:`store_exception` serialize their results.
Parameters:
res(object): The result data, typically a dict.
Returns:
object: The result.
"""
return unwrap_result(res)
[docs]
def get_result(self, message, *, block: bool = False, timeout: typing.Optional[int] = None) -> Result:
"""Get a result from the backend.
Parameters:
message(Message)
block(bool): Whether or not to block until a result is set.
timeout(int): The maximum amount of time, in ms, to wait for
a result when block is True. Defaults to 10 seconds.
Raises:
ResultMissing: When block is False and the result isn't set.
ResultTimeout: When waiting for a result times out.
Returns:
object: The result.
"""
if timeout is None:
timeout = DEFAULT_TIMEOUT
end_time = time.monotonic() + timeout / 1000
message_key = self.build_message_key(message)
attempts = 0
while True:
result = self._get(message_key)
if result is Missing and block:
attempts, delay = compute_backoff(attempts, factor=BACKOFF_FACTOR)
delay_seconds = delay / 1000
if time.monotonic() + delay_seconds > end_time:
raise ResultTimeout(message)
time.sleep(delay_seconds)
continue
elif result is Missing:
raise ResultMissing(message)
else:
return self.unwrap_result(result)
[docs]
def store_result(self, message, result: Result, ttl: int) -> None:
"""Store a result in the backend.
Parameters:
message(Message)
result(object): Must be serializable.
ttl(int): The maximum amount of time the result may be
stored in the backend for.
"""
message_key = self.build_message_key(message)
return self._store(message_key, wrap_result(result), ttl)
[docs]
def store_exception(self, message, exception: Exception, ttl: int) -> None:
"""Store an exception in the backend.
Parameters:
message(Message)
exception(Exception)
ttl(int): The maximum amount of time the exception may be
stored in the backend for.
"""
message_key = self.build_message_key(message)
return self._store(message_key, wrap_exception(exception), ttl)
[docs]
def build_message_key(self, message) -> str:
"""Given a message, return its globally-unique key.
Parameters:
message(Message)
Returns:
str
"""
message_key = "%(namespace)s:%(queue_name)s:%(actor_name)s:%(message_id)s" % {
"namespace": self.namespace,
"queue_name": q_name(message.queue_name),
"actor_name": message.actor_name,
"message_id": message.message_id,
}
if self.use_namespace_prefix_keys:
return message_key
else:
return hashlib.md5(message_key.encode("utf-8")).hexdigest()
def _get(self, message_key: str) -> MResult: # pragma: no cover
"""Get a result from the backend. Subclasses may implement
this method if they want to use the default, polling,
implementation of get_result.
"""
raise NotImplementedError(
"%(classname)r does not implement _get()"
% {
"classname": type(self).__name__,
}
)
def _store(self, message_key: str, result: Result, ttl: int) -> None: # pragma: no cover
"""Store a result in the backend. Subclasses may implement
this method if they want to use the default implementation of
set_result.
"""
raise NotImplementedError(
"%(classname)r does not implement _store()"
% {
"classname": type(self).__name__,
}
)