This document describes the current stable version of Kombu (5.0). For development docs, go here.
Source code for kombu.utils.compat
"""Python Compatibility Utilities."""
import numbers
import sys
from functools import wraps
from contextlib import contextmanager
try:
from importlib import metadata as importlib_metadata
except ImportError:
# TODO: Remove this when we drop support for Python 3.7
import importlib_metadata
from kombu.exceptions import reraise
try:
from io import UnsupportedOperation
FILENO_ERRORS = (AttributeError, ValueError, UnsupportedOperation)
except ImportError: # pragma: no cover
# Py2
FILENO_ERRORS = (AttributeError, ValueError) # noqa
try:
from billiard.util import register_after_fork
except ImportError: # pragma: no cover
try:
from multiprocessing.util import register_after_fork # noqa
except ImportError:
register_after_fork = None # noqa
try:
from typing import NamedTuple
except ImportError:
import collections
def NamedTuple(name, fields):
"""Typed version of collections.namedtuple."""
return collections.namedtuple(name, [k for k, _ in fields])
_environment = None
[docs]def coro(gen):
"""Decorator to mark generator as co-routine."""
@wraps(gen)
def wind_up(*args, **kwargs):
it = gen(*args, **kwargs)
next(it)
return it
return wind_up
def _detect_environment():
# ## -eventlet-
if 'eventlet' in sys.modules:
try:
from eventlet.patcher import is_monkey_patched as is_eventlet
import socket
if is_eventlet(socket):
return 'eventlet'
except ImportError:
pass
# ## -gevent-
if 'gevent' in sys.modules:
try:
from gevent import socket as _gsocket
import socket
if socket.socket is _gsocket.socket:
return 'gevent'
except ImportError:
pass
return 'default'
[docs]def detect_environment():
"""Detect the current environment: default, eventlet, or gevent."""
global _environment
if _environment is None:
_environment = _detect_environment()
return _environment
[docs]def entrypoints(namespace):
"""Return setuptools entrypoints for namespace."""
return (
(ep, ep.load())
for ep in importlib_metadata.entry_points().get(namespace, [])
)
[docs]def fileno(f):
"""Get fileno from file-like object."""
if isinstance(f, numbers.Integral):
return f
return f.fileno()
[docs]def maybe_fileno(f):
"""Get object fileno, or :const:`None` if not defined."""
try:
return fileno(f)
except FILENO_ERRORS:
pass
[docs]@contextmanager
def nested(*managers): # pragma: no cover
"""Nest context managers."""
# flake8: noqa
exits = []
vars = []
exc = (None, None, None)
try:
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
# Don't rely on sys.exc_info() still containing
# the right information. Another exception may
# have been raised and caught by an exit method
reraise(exc[0], exc[1], exc[2])
finally:
del(exc)