Notifications and listeners

Overview

Engines provide a way to receive notification on task and flow state transitions (see states), which is useful for monitoring, logging, metrics, debugging and plenty of other tasks.

To receive these notifications you should register a callback with an instance of the Notifier class that is attached to Engine attributes atom_notifier and notifier.

TaskFlow also comes with a set of predefined listeners, and provides means to write your own listeners, which can be more convenient than using raw callbacks.

Receiving notifications with callbacks

Flow notifications

To receive notification on flow state changes use the Notifier instance available as the notifier property of an engine.

A basic example is:

>>> class CatTalk(task.Task):
...   def execute(self, meow):
...     print(meow)
...     return "cat"
...
>>> class DogTalk(task.Task):
...   def execute(self, woof):
...     print(woof)
...     return 'dog'
...
>>> def flow_transition(state, details):
...     print("Flow '%s' transition to state %s" % (details['flow_name'], state))
...
>>>
>>> flo = linear_flow.Flow("cat-dog").add(
...   CatTalk(), DogTalk(provides="dog"))
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
>>> eng.notifier.register(ANY, flow_transition)
>>> eng.run()
Flow 'cat-dog' transition to state RUNNING
meow
woof
Flow 'cat-dog' transition to state SUCCESS

Task notifications

To receive notification on task state changes use the Notifier instance available as the atom_notifier property of an engine.

A basic example is:

>>> class CatTalk(task.Task):
...   def execute(self, meow):
...     print(meow)
...     return "cat"
...
>>> class DogTalk(task.Task):
...   def execute(self, woof):
...     print(woof)
...     return 'dog'
...
>>> def task_transition(state, details):
...     print("Task '%s' transition to state %s" % (details['task_name'], state))
...
>>>
>>> flo = linear_flow.Flow("cat-dog")
>>> flo.add(CatTalk(), DogTalk(provides="dog"))
<taskflow.patterns.linear_flow.Flow object at 0x...>
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
>>> eng.atom_notifier.register(ANY, task_transition)
>>> eng.run()
Task 'CatTalk' transition to state RUNNING
meow
Task 'CatTalk' transition to state SUCCESS
Task 'DogTalk' transition to state RUNNING
woof
Task 'DogTalk' transition to state SUCCESS

Listeners

TaskFlow comes with a set of predefined listeners – helper classes that can be used to do various actions on flow and/or tasks transitions. You can also create your own listeners easily, which may be more convenient than using raw callbacks for some use cases.

For example, this is how you can use PrintingListener:

>>> from taskflow.listeners import printing
>>> class CatTalk(task.Task):
...   def execute(self, meow):
...     print(meow)
...     return "cat"
...
>>> class DogTalk(task.Task):
...   def execute(self, woof):
...     print(woof)
...     return 'dog'
...
>>>
>>> flo = linear_flow.Flow("cat-dog").add(
...   CatTalk(), DogTalk(provides="dog"))
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
>>> with printing.PrintingListener(eng):
...   eng.run()
...
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved flow 'cat-dog' (...) into state 'RUNNING' from state 'PENDING'
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'CatTalk' (...) into state 'RUNNING' from state 'PENDING'
meow
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'CatTalk' (...) into state 'SUCCESS' from state 'RUNNING' with result 'cat' (failure=False)
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'DogTalk' (...) into state 'RUNNING' from state 'PENDING'
woof
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'DogTalk' (...) into state 'SUCCESS' from state 'RUNNING' with result 'dog' (failure=False)
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved flow 'cat-dog' (...) into state 'SUCCESS' from state 'RUNNING'

Interfaces

taskflow.listeners.base.FINISH_STATES = ('FAILURE', 'SUCCESS', 'REVERTED', 'REVERT_FAILURE')

These states will results be usable, other states do not produce results.

taskflow.listeners.base.DEFAULT_LISTEN_FOR = ('*',)

What is listened for by default…

class taskflow.listeners.base.Listener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',))[source]

Bases: object

Base class for listeners.

A listener can be attached to an engine to do various actions on flow and atom state transitions. It implements the context manager protocol to be able to register and unregister with a given engine automatically when a context is entered and when it is exited.

To implement a listener, derive from this class and override _flow_receiver and/or _task_receiver and/or _retry_receiver methods (in this class, they do nothing).

class taskflow.listeners.base.DumpingListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',))[source]

Bases: taskflow.listeners.base.Listener

Abstract base class for dumping listeners.

This provides a simple listener that can be attached to an engine which can be derived from to dump task and/or flow state transitions to some target backend.

To implement your own dumping listener derive from this class and override the _dump method.

Implementations

Printing and logging listeners

class taskflow.listeners.logging.LoggingListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',), log=None, level=10)[source]

Bases: taskflow.listeners.base.DumpingListener

Listener that logs notifications it receives.

It listens for task and flow notifications and writes those notifications to a provided logger, or logger of its module (taskflow.listeners.logging) if none is provided (and no class attribute is overridden). The log level can also be configured, logging.DEBUG is used by default when none is provided.

class taskflow.listeners.logging.DynamicLoggingListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',), log=None, failure_level=30, level=10, hide_inputs_outputs_of=(), fail_formatter=None)[source]

Bases: taskflow.listeners.base.Listener

Listener that logs notifications it receives.

It listens for task and flow notifications and writes those notifications to a provided logger, or logger of its module (taskflow.listeners.logging) if none is provided (and no class attribute is overridden). The log level can slightly be configured and logging.DEBUG or logging.WARNING (unless overridden via a constructor parameter) will be selected automatically based on the execution state and results produced.

The following flow states cause logging.WARNING (or provided level) to be used:

  • states.FAILURE

  • states.REVERTED

The following task states cause logging.WARNING (or provided level) to be used:

  • states.FAILURE

  • states.RETRYING

  • states.REVERTING

  • states.REVERT_FAILURE

When a task produces a Failure object as its result (typically this happens when a task raises an exception) this will always switch the logger to use logging.WARNING (if the failure object contains a exc_info tuple this will also be logged to provide a meaningful traceback).

class taskflow.listeners.printing.PrintingListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',), stderr=False)[source]

Bases: taskflow.listeners.base.DumpingListener

Writes the task and flow notifications messages to stdout or stderr.

Timing listeners

class taskflow.listeners.timing.DurationListener(engine)[source]

Bases: taskflow.listeners.base.Listener

Listener that captures task duration.

It records how long a task took to execute (or fail) to storage. It saves the duration in seconds as float value to task metadata with key 'duration'.

class taskflow.listeners.timing.PrintingDurationListener(engine, printer=None)[source]

Bases: taskflow.listeners.timing.DurationListener

Listener that prints the duration as well as recording it.

class taskflow.listeners.timing.EventTimeListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',))[source]

Bases: taskflow.listeners.base.Listener

Listener that captures task, flow, and retry event timestamps.

It records how when an event is received (using unix time) to storage. It saves the timestamps under keys (in atom or flow details metadata) of the format {event}-timestamp where event is the state/event name that has been received.

This information can be later extracted/examined to derive durations…

Claim listener

class taskflow.listeners.claims.CheckingClaimListener(engine, job, board, owner, on_job_loss=None)[source]

Bases: taskflow.listeners.base.Listener

Listener that interacts [engine, job, jobboard]; ensures claim is valid.

This listener (or a derivative) can be associated with an engines notification system after the job has been claimed (so that the jobs work can be worked on by that engine). This listener (after associated) will check that the job is still claimed whenever the engine notifies of a task or flow state change. If the job is not claimed when a state change occurs, a associated handler (or the default) will be activated to determine how to react to this hopefully exceptional case.

NOTE(harlowja): this may create more traffic than desired to the jobboard backend (zookeeper or other), since the amount of state change per task and flow is non-zero (and checking during each state change will result in quite a few calls to that management system to check the jobs claim status); this could be later optimized to check less (or only check on a smaller set of states)

NOTE(harlowja): if a custom on_job_loss callback is provided it must accept three positional arguments, the first being the current engine being ran, the second being the ‘task/flow’ state and the third being the details that were sent from the engine to listeners for inspection.

Capturing listener

class taskflow.listeners.capturing.CaptureListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',), capture_flow=True, capture_task=True, capture_retry=True, skip_tasks=None, skip_retries=None, skip_flows=None, values=None)[source]

Bases: taskflow.listeners.base.Listener

A listener that captures transitions and saves them locally.

NOTE(harlowja): this listener is mainly useful for testing (where it is useful to test the appropriate/expected transitions, produced results… occurred after engine running) but it could have other usages as well.

Variables

values – Captured transitions + details (the result of the _format_capture() method) are stored into this list (a previous list to append to may be provided using the constructor keyword argument of the same name); by default this stores tuples of the format (kind, state, details).

FLOW = 'flow'

Kind that denotes a ‘flow’ capture.

TASK = 'task'

Kind that denotes a ‘task’ capture.

RETRY = 'retry'

Kind that denotes a ‘retry’ capture.

Formatters

class taskflow.formatters.FailureFormatter(engine, hide_inputs_outputs_of=())[source]

Bases: object

Formats a failure and connects it to associated atoms & engine.

format(fail, atom_matcher)[source]

Returns a (exc_info, details) tuple about the failure.

The exc_info tuple should be a standard three element (exctype, value, traceback) tuple that will be used for further logging. A non-empty string is typically returned for details; it should contain any string info about the failure (with any specific details the exc_info may not have/contain).

Hierarchy

Inheritance diagram of taskflow.listeners.base.DumpingListener, taskflow.listeners.base.Listener, taskflow.listeners.capturing.CaptureListener, taskflow.listeners.claims.CheckingClaimListener, taskflow.listeners.logging.DynamicLoggingListener, taskflow.listeners.logging.LoggingListener, taskflow.listeners.printing.PrintingListener, taskflow.listeners.timing.PrintingDurationListener, taskflow.listeners.timing.EventTimeListener, taskflow.listeners.timing.DurationListener