Source code for dramatiq.threading
# This file is a part of Dramatiq.
#
# Copyright (C) 2017,2018 CLEARTYPE SRL <[email protected]>
#
# Dramatiq is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Dramatiq is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import ctypes
import inspect
import platform
from .logging import get_logger
__all__ = [
"Interrupt",
"current_platform",
"is_gevent_active",
"raise_thread_exception",
"supported_platforms",
]
logger = get_logger(__name__)
current_platform = platform.python_implementation()
python_version = platform.python_version_tuple()
thread_id_ctype = ctypes.c_long if python_version < ("3", "7") else ctypes.c_ulong
supported_platforms = {"CPython"}
def is_gevent_active():
"""Detect if gevent monkey patching is active."""
try:
from gevent import monkey
except ImportError: # pragma: no cover
return False
return bool(monkey.saved)
[docs]
class Interrupt(BaseException):
"""Base class for exceptions used to asynchronously interrupt a
thread's execution. An actor may catch these exceptions in order
to respond gracefully, such as performing any necessary cleanup.
This is *not* a subclass of ``DramatiqError`` to avoid it being
caught unintentionally.
"""
def raise_thread_exception(thread_id, exception):
"""Raise an exception in a thread.
Currently, this is only available on CPython.
Note:
This works by setting an async exception in the thread. This means
that the exception will only get called the next time that thread
acquires the GIL. Concretely, this means that this middleware can't
cancel system calls.
"""
if current_platform == "CPython":
_raise_thread_exception_cpython(thread_id, exception)
else:
message = "Setting thread exceptions (%s) is not supported for your current platform (%r)."
exctype = (exception if inspect.isclass(exception) else type(exception)).__name__
logger.critical(message, exctype, current_platform)
def _raise_thread_exception_cpython(thread_id, exception):
exctype = (exception if inspect.isclass(exception) else type(exception)).__name__
thread_id = thread_id_ctype(thread_id)
exception = ctypes.py_object(exception)
count = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, exception)
if count == 0:
logger.critical("Failed to set exception (%s) in thread %r.", exctype, thread_id.value)
elif count > 1: # pragma: no cover
logger.critical("Exception (%s) was set in multiple threads. Undoing...", exctype)
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, ctypes.c_long(0))