Source code for dramatiq.middleware.group_callbacks
# This file is a part of Dramatiq.
#
# Copyright (C) 2017,2018 CLEARTYPE SRL <[email protected]>
#
# Dramatiq is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Dramatiq is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations
import os
import warnings
from ..rate_limits import Barrier, RateLimiterBackend
from .middleware import Middleware
[docs]
class GroupCallbacks(Middleware):
"""Middleware that enables adding completion callbacks to |Groups|."""
def __init__(self, rate_limiter_backend: RateLimiterBackend, *, barrier_ttl: int = 86400 * 1000) -> None:
self.rate_limiter_backend = rate_limiter_backend
_barrier_ttl_env = os.getenv("dramatiq_group_callback_barrier_ttl", None)
if _barrier_ttl_env is not None:
warnings.warn(
"Configuring the barrier TTL via the 'dramatiq_group_callback_barrier_ttl' environment variable is deprecated; "
"use the `barrier_ttl` argument of the `GroupCallbacks` middleware instead.",
FutureWarning,
stacklevel=2,
)
self.barrier_ttl = int(_barrier_ttl_env)
else:
self.barrier_ttl = barrier_ttl
def after_process_message(self, broker, message, *, result=None, exception=None):
from ..message import Message
if exception is None:
group_completion_uuid = message.options.get("group_completion_uuid")
group_completion_callbacks = message.options.get("group_completion_callbacks")
if group_completion_uuid and group_completion_callbacks:
barrier = Barrier(
self.rate_limiter_backend,
group_completion_uuid,
ttl=self.barrier_ttl,
)
if barrier.wait(block=False):
for message in group_completion_callbacks:
broker.enqueue(Message(**message))