Source code for dramatiq.results.backends.redis
# This file is a part of Dramatiq.
#
# Copyright (C) 2017,2018,2019,2020 CLEARTYPE SRL <[email protected]>
#
# Dramatiq is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Dramatiq is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import redis
from ..backend import DEFAULT_TIMEOUT, ResultBackend, ResultMissing, ResultTimeout
[docs]
class RedisBackend(ResultBackend):
"""A result backend for Redis_. This is the recommended result
backend as waiting for a result is resource efficient.
Parameters:
namespace(str): A string with which to prefix result keys.
encoder(Encoder): The encoder to use when storing and retrieving
result data. Defaults to :class:`.JSONEncoder`.
client(Redis): An optional client. If this is passed,
then all other parameters are ignored.
url(str): An optional connection URL. If both a URL and
connection parameters are provided, the URL is used.
**parameters: Connection parameters are passed directly
to :class:`redis.Redis`.
.. _redis: https://redis.io
"""
def __init__(self, *, namespace="dramatiq-results", encoder=None, client=None, url=None, **parameters):
super().__init__(namespace=namespace, encoder=encoder)
if url:
parameters["connection_pool"] = redis.ConnectionPool.from_url(url)
# TODO: Replace usages of StrictRedis (redis-py 2.x) with Redis in Dramatiq 2.0.
self.client = client or redis.StrictRedis(**parameters)
def get_result(self, message, *, block=False, timeout=None):
"""Get a result from the backend.
Warning:
Sub-second timeouts are not respected by this backend.
Parameters:
message(Message)
block(bool): Whether or not to block until a result is set.
timeout(int): The maximum amount of time, in ms, to wait for
a result when block is True. Defaults to 10 seconds.
Raises:
ResultMissing: When block is False and the result isn't set.
ResultTimeout: When waiting for a result times out.
Returns:
object: The result.
"""
if timeout is None:
timeout = DEFAULT_TIMEOUT
message_key = self.build_message_key(message)
if block:
timeout = int(timeout / 1000)
if timeout == 0:
data = self.client.rpoplpush(message_key, message_key)
else:
data = self.client.brpoplpush(message_key, message_key, timeout)
if data is None:
raise ResultTimeout(message)
else:
data = self.client.lindex(message_key, 0)
if data is None:
raise ResultMissing(message)
return self.unwrap_result(self.encoder.decode(data))
def _store(self, message_key, result, ttl):
with self.client.pipeline() as pipe:
pipe.delete(message_key)
pipe.lpush(message_key, self.encoder.encode(result))
pipe.pexpire(message_key, ttl)
pipe.execute()