Source code for dramatiq.middleware.group_callbacks
# This file is a part of Dramatiq.
#
# Copyright (C) 2017,2018 CLEARTYPE SRL <[email protected]>
#
# Dramatiq is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Dramatiq is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations
import os
from ..rate_limits import Barrier, RateLimiterBackend
from .middleware import Middleware
GROUP_CALLBACK_BARRIER_TTL = int(os.getenv("dramatiq_group_callback_barrier_ttl", "86400000"))
[docs]
class GroupCallbacks(Middleware):
"""Middleware that enables adding completion callbacks to |Groups|."""
def __init__(self, rate_limiter_backend: RateLimiterBackend) -> None:
self.rate_limiter_backend = rate_limiter_backend
def after_process_message(self, broker, message, *, result=None, exception=None):
from ..message import Message
if exception is None:
group_completion_uuid = message.options.get("group_completion_uuid")
group_completion_callbacks = message.options.get("group_completion_callbacks")
if group_completion_uuid and group_completion_callbacks:
barrier = Barrier(
self.rate_limiter_backend,
group_completion_uuid,
ttl=GROUP_CALLBACK_BARRIER_TTL,
)
if barrier.wait(block=False):
for message in group_completion_callbacks:
broker.enqueue(Message(**message))