Utilities¶
Warning
External usage of internal utility functions and modules should be kept to a minimum as they may be altered, refactored or moved to other locations without notice (and without the typical deprecation cycle).
Async¶
Eventlet¶
Iterators¶
- taskflow.utils.iter_utils.fill(it, desired_len, filler=None)[source]¶
Iterates over a provided iterator up to the desired length.
If the source iterator does not have enough values then the filler value is yielded until the desired length is reached.
- taskflow.utils.iter_utils.count(it)[source]¶
Returns how many values in the iterator (depletes the iterator).
- taskflow.utils.iter_utils.generate_delays(delay, max_delay, multiplier=2)[source]¶
Generator/iterator that provides back delays values.
The values it generates increments by a given multiple after each iteration (using the max delay as a upper bound). Negative values will never be generated… and it will iterate forever (ie it will never stop generating values).
- taskflow.utils.iter_utils.unique_seen(its, seen_selector=None)[source]¶
Yields unique values from iterator(s) (and retains order).
- taskflow.utils.iter_utils.find_first_match(it, matcher, not_found_value=None)[source]¶
Searches iterator for first value that matcher callback returns true.
Kazoo¶
- taskflow.utils.kazoo_utils.prettify_failures(failures, limit=-1)[source]¶
Prettifies a checked commits failures (ignores sensitive data…).
- exception taskflow.utils.kazoo_utils.KazooTransactionException(message, failures)[source]¶
Bases:
KazooException
Exception raised when a checked commit fails.
- taskflow.utils.kazoo_utils.checked_commit(txn)[source]¶
Commits a kazoo transcation and validates the result.
NOTE(harlowja): Until https://github.com/python-zk/kazoo/pull/224 is fixed or a similar pull request is merged we have to workaround the transaction failing silently.
- taskflow.utils.kazoo_utils.finalize_client(client)[source]¶
Stops and closes a client, even if it wasn’t started.
- taskflow.utils.kazoo_utils.check_compatible(client, min_version=None, max_version=None)[source]¶
Checks if a kazoo client is backed by a zookeeper server version.
This check will verify that the zookeeper server version that the client is connected to satisfies a given minimum version (inclusive) and maximum (inclusive) version range. If the server is not in the provided version range then a exception is raised indiciating this.
- taskflow.utils.kazoo_utils.make_client(conf)[source]¶
Creates a kazoo client given a configuration dictionary.
- Parameters:
conf (dict) – configuration dictionary that will be used to configure the created client
The keys that will be extracted are:
read_only
: boolean that specifies whether to allow connections to read only servers, defaults toFalse
randomize_hosts
: boolean that specifies whether to randomize host lists provided, defaults toFalse
command_retry
: a kazoo retry object (or dict of options which will be used for creating one) that will be used for retrying commands that are executedconnection_retry
: a kazoo retry object (or dict of options which will be used for creating one) that will be used for retrying connection failures that occurhosts
: a string, list, set (or dict with host keys) that will specify the hosts the kazoo client should be connected to, if none is provided thenlocalhost:2181
will be used by defaulttimeout
: a float value that specifies the default timeout that the kazoo client will usehandler
: a kazoo handler object that can be used to provide the client with alternate async strategies (the default is thread based, but gevent, or eventlet ones can be provided as needed)keyfile
: SSL keyfile to use for authenticationkeyfile_password
: SSL keyfile passwordcertfile
: SSL certfile to use for authenticationca
: SSL CA file to use for authenticationuse_ssl
: argument to control whether SSL is used or notverify_certs
: when using SSL, argument to bypasscerts verification
Kombu¶
- class taskflow.utils.kombu_utils.DelayedPretty(message)[source]¶
Bases:
object
Wraps a message and delays prettifying it until requested.
TODO(harlowja): remove this when https://github.com/celery/kombu/pull/454/ is merged and a release is made that contains it (since that pull request is equivalent and/or better than this).
Miscellaneous¶
- class taskflow.utils.misc.StrEnum(value)[source]¶
Bases:
str
,Enum
An enumeration that is also a string and can be compared to strings.
- class taskflow.utils.misc.StringIO(initial_value='', newline='\n')[source]¶
Bases:
StringIO
String buffer with some small additions.
- class taskflow.utils.misc.BytesIO(initial_bytes=b'')[source]¶
Bases:
BytesIO
Byte buffer with some small additions.
- taskflow.utils.misc.get_hostname(unknown_hostname='<unknown>')[source]¶
Gets the machines hostname; if not able to returns an invalid one.
- taskflow.utils.misc.match_type(obj, matchers)[source]¶
Matches a given object using the given matchers list/iterable.
NOTE(harlowja): each element of the provided list/iterable must be tuple of (valid types, result).
Returns the result (the second element of the provided tuple) if a type match occurs, otherwise none if no matches are found.
- taskflow.utils.misc.countdown_iter(start_at, decr=1)[source]¶
Generator that decrements after each generation until <= zero.
NOTE(harlowja): we can likely remove this when we can use an
itertools.count
that takes a step (on py2.6 which we still support that step parameter does not exist and therefore can’t be used).
- taskflow.utils.misc.extract_driver_and_conf(conf, conf_key)[source]¶
Common function to get a driver name and its configuration.
- taskflow.utils.misc.reverse_enumerate(items)[source]¶
Like reversed(enumerate(items)) but with less copying/cloning…
- taskflow.utils.misc.merge_uri(uri, conf)[source]¶
Merges a parsed uri into the given configuration dictionary.
Merges the username, password, hostname, port, and query parameters of a URI into the given configuration dictionary (it does not overwrite existing configuration keys if they already exist) and returns the merged configuration.
NOTE(harlowja): does not merge the path, scheme or fragment.
- taskflow.utils.misc.find_subclasses(locations, base_cls, exclude_hidden=True)[source]¶
Finds subclass types in the given locations.
This will examines the given locations for types which are subclasses of the base class type provided and returns the found subclasses (or fails with exceptions if this introspection can not be accomplished).
If a string is provided as one of the locations it will be imported and examined if it is a subclass of the base class. If a module is given, all of its members will be examined for attributes which are subclasses of the base class. If a type itself is given it will be examined for being a subclass of the base class.
- taskflow.utils.misc.pick_first_not_none(*values)[source]¶
Returns first of values that is not None (or None if all are/were).
- taskflow.utils.misc.disallow_when_frozen(excp_cls)[source]¶
Frozen checking/raising method decorator.
- taskflow.utils.misc.clamp(value, minimum, maximum, on_clamped=None)[source]¶
Clamps a value to ensure its >= minimum and <= maximum.
- taskflow.utils.misc.fix_newlines(text, replacement='\n')[source]¶
Fixes text that may end with wrong nl by replacing with right nl.
- taskflow.utils.misc.binary_encode(text, encoding='utf-8', errors='strict')[source]¶
Encodes a text string into a binary string using given encoding.
Does nothing if data is already a binary string (raises on unknown types).
- taskflow.utils.misc.binary_decode(data, encoding='utf-8', errors='strict')[source]¶
Decodes a binary string into a text string using given encoding.
Does nothing if data is already a text string (raises on unknown types).
- taskflow.utils.misc.decode_msgpack(raw_data, root_types=(<class 'dict'>, ))[source]¶
Parse raw data to get decoded object.
Decodes a msgback encoded ‘blob’ from a given raw data binary string and checks that the root type of that decoded object is in the allowed set of types (by default a dict should be the root type).
- taskflow.utils.misc.decode_json(raw_data, root_types=(<class 'dict'>, ))[source]¶
Parse raw data to get decoded object.
Decodes a JSON encoded ‘blob’ from a given raw data binary string and checks that the root type of that decoded object is in the allowed set of types (by default a dict should be the root type).
- class taskflow.utils.misc.cachedproperty(fget=None, require_lock=True)[source]¶
Bases:
object
A thread-safe descriptor property that is only evaluated once.
This caching descriptor can be placed on instance methods to translate those methods into properties that will be cached in the instance (avoiding repeated attribute checking logic to do the equivalent).
NOTE(harlowja): by default the property that will be saved will be under the decorated methods name prefixed with an underscore. For example if we were to attach this descriptor to an instance method ‘get_thing(self)’ the cached property would be stored under ‘_get_thing’ in the self object after the first call to ‘get_thing’ occurs.
- taskflow.utils.misc.millis_to_datetime(milliseconds)[source]¶
Converts number of milliseconds (from epoch) into a datetime object.
- taskflow.utils.misc.get_version_string(obj)[source]¶
Gets a object’s version as a string.
Returns string representation of object’s version taken from its ‘version’ attribute, or None if object does not have such attribute or its version is None.
- taskflow.utils.misc.sequence_minus(seq1, seq2)[source]¶
Calculate difference of two sequences.
Result contains the elements from first sequence that are not present in second sequence, in original order. Works even if sequence elements are not hashable.
- taskflow.utils.misc.capture_failure()[source]¶
Captures the occurring exception and provides a failure object back.
This will save the current exception information and yield back a failure object for the caller to use (it will raise a runtime error if no active exception is being handled).
This is useful since in some cases the exception context can be cleared, resulting in None being attempted to be saved after an exception handler is run. This can happen when eventlet switches greenthreads or when running an exception handler, code raises and catches an exception. In both cases the exception context will be cleared.
To work around this, we save the exception state, yield a failure and then run other code.
For example:
>>> from taskflow.utils import misc >>> >>> def cleanup(): ... pass ... >>> >>> def save_failure(f): ... print("Saving %s" % f) ... >>> >>> try: ... raise IOError("Broken") ... except Exception: ... with misc.capture_failure() as fail: ... print("Activating cleanup") ... cleanup() ... save_failure(fail) ... Activating cleanup Saving Failure: IOError: Broken
- taskflow.utils.misc.is_iterable(obj)[source]¶
Tests an object to to determine whether it is iterable.
This function will test the specified object to determine whether it is iterable. String types (both
str
andunicode
) are ignored and will return False.- Parameters:
obj – object to be tested for iterable
- Returns:
True if object is iterable and is not a string
Persistence¶
- taskflow.utils.persistence_utils.temporary_log_book(backend=None)[source]¶
Creates a temporary logbook for temporary usage in the given backend.
Mainly useful for tests and other use cases where a temporary logbook is needed for a short-period of time.
- taskflow.utils.persistence_utils.temporary_flow_detail(backend=None, meta=None)[source]¶
Creates a temporary flow detail and logbook in the given backend.
Mainly useful for tests and other use cases where a temporary flow detail and a temporary logbook is needed for a short-period of time.
- taskflow.utils.persistence_utils.create_flow_detail(flow, book=None, backend=None, meta=None)[source]¶
Creates a flow detail for a flow & adds & saves it in a logbook.
This will create a flow detail for the given flow using the flow name, and add it to the provided logbook and then uses the given backend to save the logbook and then returns the created flow detail.
If no book is provided a temporary one will be created automatically (no reference to the logbook will be returned, so this should nearly always be provided or only used in situations where no logbook is needed, for example in tests). If no backend is provided then no saving will occur and the created flow detail will not be persisted even if the flow detail was added to a given (or temporarily generated) logbook.
Redis¶
- class taskflow.utils.redis_utils.RedisClient(*args, **kwargs)[source]¶
Bases:
Redis
A redis client that can be closed (and raises on-usage after closed).
TODO(harlowja): if https://github.com/andymccurdy/redis-py/issues/613 ever gets resolved or merged or other then we can likely remove this.
- transaction(func: Callable[[Pipeline], None], *watches, **kwargs) None ¶
Convenience method for executing the callable func as a transaction while watching all keys specified in watches. The ‘func’ callable should expect a single argument which is a Pipeline object.
- pubsub(**kwargs)¶
Return a Publish/Subscribe object. With this object, you can subscribe to channels and listen for messages that get published to them.
- class taskflow.utils.redis_utils.UnknownExpire(value)[source]¶
Bases:
IntEnum
Non-expiry (not ttls) results return from
get_expiry()
.See: http://redis.io/commands/ttl or http://redis.io/commands/pttl
- DOES_NOT_EXPIRE = -1¶
The command returns
-1
if the key exists but has no associated expire.
- KEY_NOT_FOUND = -2¶
The command returns
-2
if the key does not exist.
- taskflow.utils.redis_utils.get_expiry(client, key, prior_version=None)[source]¶
Gets an expiry for a key (using best determined ttl method).
Schema¶
Threading¶
- taskflow.utils.threading_utils.is_alive(thread)[source]¶
Helper to determine if a thread is alive (handles none safely).
- taskflow.utils.threading_utils.get_ident()[source]¶
Return the ‘thread identifier’ of the current thread.
- taskflow.utils.threading_utils.get_optimal_thread_count(default=2)[source]¶
Try to guess optimal thread count for current system.
- taskflow.utils.threading_utils.daemon_thread(target, *args, **kwargs)[source]¶
Makes a daemon thread that calls the given target when started.