Source code for dramatiq.rate_limits.rate_limiter
# This file is a part of Dramatiq.
#
# Copyright (C) 2017,2018 CLEARTYPE SRL <[email protected]>
#
# Dramatiq is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Dramatiq is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from contextlib import contextmanager
from ..errors import RateLimitExceeded
[docs]
class RateLimiter:
"""ABC for rate limiters.
Examples:
>>> from dramatiq.rate_limits.backends import RedisBackend
>>> backend = RedisBackend()
>>> limiter = ConcurrentRateLimiter(backend, "distributed-mutex", limit=1)
>>> with limiter.acquire(raise_on_failure=False) as acquired:
... if not acquired:
... print("Mutex not acquired.")
... return
...
... print("Mutex acquired.")
Parameters:
backend(RateLimiterBackend): The rate limiting backend to use.
key(str): The key to rate limit on.
"""
def __init__(self, backend, key):
self.backend = backend
self.key = key
def _acquire(self): # pragma: no cover
raise NotImplementedError
def _release(self): # pragma: no cover
raise NotImplementedError
[docs]
@contextmanager
def acquire(self, *, raise_on_failure=True):
"""Attempt to acquire a slot under this rate limiter.
Parameters:
raise_on_failure(bool): Whether or not failures should raise an
exception. If this is false, the context manager will instead
return a boolean value representing whether or not the rate
limit slot was acquired.
Returns:
bool: Whether or not the slot could be acquired.
"""
acquired = False
try:
acquired = self._acquire()
if raise_on_failure and not acquired:
raise RateLimitExceeded("rate limit exceeded for key %(key)r" % vars(self))
yield acquired
finally:
if acquired:
self._release()