Examples

While developing TaskFlow the team has worked hard to make sure the various concepts are explained by relevant examples. Here are a few selected examples to get started (ordered by perceived complexity):

To explore more of these examples please check out the examples directory in the TaskFlow source tree.

Note

If the examples provided are not satisfactory (or up to your standards) contributions are welcome and very much appreciated to help improve them. The higher the quality and the clearer the examples are the better and more useful they are for everyone.

Hello world

Note

Full source located at hello_world.

  1import os
  2import sys
  3
  4logging.basicConfig(level=logging.ERROR)
  5
  6top_dir = os.path.abspath(
  7    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
  8)
  9sys.path.insert(0, top_dir)
 10
 11from taskflow import engines
 12from taskflow.patterns import linear_flow as lf
 13from taskflow.patterns import unordered_flow as uf
 14from taskflow import task
 15
 16
 17# INTRO: This is the defacto hello world equivalent for taskflow; it shows how
 18# an overly simplistic workflow can be created that runs using different
 19# engines using different styles of execution (all can be used to run in
 20# parallel if a workflow is provided that is parallelizable).
 21
 22
 23class PrinterTask(task.Task):
 24    def __init__(self, name, show_name=True, inject=None):
 25        super().__init__(name, inject=inject)
 26        self._show_name = show_name
 27
 28    def execute(self, output):
 29        if self._show_name:
 30            print(f"{self.name}: {output}")
 31        else:
 32            print(output)
 33
 34
 35# This will be the work that we want done, which for this example is just to
 36# print 'hello world' (like a song) using different tasks and different
 37# execution models.
 38song = lf.Flow("beats")
 39
 40# Unordered flows when ran can be ran in parallel; and a chorus is everyone
 41# singing at once of course!
 42hi_chorus = uf.Flow('hello')
 43world_chorus = uf.Flow('world')
 44for name, hello, world in [
 45    ('bob', 'hello', 'world'),
 46    ('joe', 'hellooo', 'worllllld'),
 47    ('sue', "helloooooo!", 'wooorllld!'),
 48]:
 49    hi_chorus.add(
 50        PrinterTask(
 51            "%s@hello" % name,
 52            # This will show up to the execute() method of
 53            # the task as the argument named 'output' (which
 54            # will allow us to print the character we want).
 55            inject={'output': hello},
 56        )
 57    )
 58    world_chorus.add(PrinterTask("%s@world" % name, inject={'output': world}))
 59
 60# The composition starts with the conductor and then runs in sequence with
 61# the chorus running in parallel, but no matter what the 'hello' chorus must
 62# always run before the 'world' chorus (otherwise the world will fall apart).
 63song.add(
 64    PrinterTask(
 65        "conductor@begin", show_name=False, inject={'output': "*ding*"}
 66    ),
 67    hi_chorus,
 68    world_chorus,
 69    PrinterTask("conductor@end", show_name=False, inject={'output': "*dong*"}),
 70)
 71
 72# Run in parallel using eventlet green threads...
 73try:
 74    import eventlet as _eventlet  # noqa
 75except ImportError:
 76    # No eventlet currently active, skip running with it...
 77    pass
 78else:
 79    print("-- Running in parallel using eventlet --")
 80    e = engines.load(
 81        song, executor='greenthreaded', engine='parallel', max_workers=1
 82    )
 83    e.run()
 84
 85
 86# Run in parallel using real threads...
 87print("-- Running in parallel using threads --")
 88e = engines.load(song, executor='threaded', engine='parallel', max_workers=1)
 89e.run()
 90
 91
 92# Run in parallel using external processes...
 93print("-- Running in parallel using processes --")
 94e = engines.load(song, executor='processes', engine='parallel', max_workers=1)
 95e.run()
 96
 97
 98# Run serially (aka, if the workflow could have been ran in parallel, it will
 99# not be when ran in this mode)...
100print("-- Running serially --")
101e = engines.load(song, engine='serial')
102e.run()
103print("-- Statistics gathered --")
104print(e.statistics)

Passing values from and to tasks

Note

Full source located at simple_linear_pass.

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6self_dir = os.path.abspath(os.path.dirname(__file__))
 7top_dir = os.path.abspath(
 8    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 9)
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13from taskflow import engines
14from taskflow.patterns import linear_flow
15from taskflow import task
16
17# INTRO: This example shows how a task (in a linear/serial workflow) can
18# produce an output that can be then consumed/used by a downstream task.
19
20
21class TaskA(task.Task):
22    default_provides = 'a'
23
24    def execute(self):
25        print("Executing '%s'" % (self.name))
26        return 'a'
27
28
29class TaskB(task.Task):
30    def execute(self, a):
31        print("Executing '%s'" % (self.name))
32        print("Got input '%s'" % (a))
33
34
35print("Constructing...")
36wf = linear_flow.Flow("pass-from-to")
37wf.add(TaskA('a'), TaskB('b'))
38
39print("Loading...")
40e = engines.load(wf)
41
42print("Compiling...")
43e.compile()
44
45print("Preparing...")
46e.prepare()
47
48print("Running...")
49e.run()
50
51print("Done...")

Using listeners

Note

Full source located at echo_listener.

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.DEBUG)
 5
 6top_dir = os.path.abspath(
 7    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 8)
 9sys.path.insert(0, top_dir)
10
11from taskflow import engines
12from taskflow.listeners import logging as logging_listener
13from taskflow.patterns import linear_flow as lf
14from taskflow import task
15
16# INTRO: This example walks through a miniature workflow which will do a
17# simple echo operation; during this execution a listener is associated with
18# the engine to receive all notifications about what the flow has performed,
19# this example dumps that output to the stdout for viewing (at debug level
20# to show all the information which is possible).
21
22
23class Echo(task.Task):
24    def execute(self):
25        print(self.name)
26
27
28# Generate the work to be done (but don't do it yet).
29wf = lf.Flow('abc')
30wf.add(Echo('a'))
31wf.add(Echo('b'))
32wf.add(Echo('c'))
33
34# This will associate the listener with the engine (the listener
35# will automatically register for notifications with the engine and deregister
36# when the context is exited).
37e = engines.load(wf)
38with logging_listener.DynamicLoggingListener(e):
39    e.run()

Using listeners (to watch a phone call)

Note

Full source located at simple_linear_listening.

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(
 7    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 8)
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14from taskflow.types import notifier
15
16ANY = notifier.Notifier.ANY
17
18# INTRO: In this example we create two tasks (this time as functions instead
19# of task subclasses as in the simple_linear.py example), each of which ~calls~
20# a given ~phone~ number (provided as a function input) in a linear fashion
21# (one after the other).
22#
23# For a workflow which is serial this shows an extremely simple way
24# of structuring your tasks (the code that does the work) into a linear
25# sequence (the flow) and then passing the work off to an engine, with some
26# initial data to be ran in a reliable manner.
27#
28# This example shows a basic usage of the taskflow structures without involving
29# the complexity of persistence. Using the structures that taskflow provides
30# via tasks and flows makes it possible for you to easily at a later time
31# hook in a persistence layer (and then gain the functionality that offers)
32# when you decide the complexity of adding that layer in is 'worth it' for your
33# applications usage pattern (which some applications may not need).
34#
35# It **also** adds on to the simple_linear.py example by adding a set of
36# callback functions which the engine will call when a flow state transition
37# or task state transition occurs. These types of functions are useful for
38# updating task or flow progress, or for debugging, sending notifications to
39# external systems, or for other yet unknown future usage that you may create!
40
41
42def call_jim(context):
43    print("Calling jim.")
44    print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
45
46
47def call_joe(context):
48    print("Calling joe.")
49    print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
50
51
52def flow_watch(state, details):
53    print('Flow => %s' % state)
54
55
56def task_watch(state, details):
57    print('Task {} => {}'.format(details.get('task_name'), state))
58
59
60# Wrap your functions into a task type that knows how to treat your functions
61# as tasks. There was previous work done to just allow a function to be
62# directly passed, but in python 3.0 there is no easy way to capture an
63# instance method, so this wrapping approach was decided upon instead which
64# can attach to instance methods (if that's desired).
65flow = lf.Flow("Call-them")
66flow.add(task.FunctorTask(execute=call_jim))
67flow.add(task.FunctorTask(execute=call_joe))
68
69# Now load (but do not run) the flow using the provided initial data.
70engine = taskflow.engines.load(
71    flow,
72    store={
73        'context': {
74            "joe_number": 444,
75            "jim_number": 555,
76        }
77    },
78)
79
80# This is where we attach our callback functions to the 2 different
81# notification objects that an engine exposes. The usage of a ANY (kleene star)
82# here means that we want to be notified on all state changes, if you want to
83# restrict to a specific state change, just register that instead.
84engine.notifier.register(ANY, flow_watch)
85engine.atom_notifier.register(ANY, task_watch)
86
87# And now run!
88engine.run()

Dumping a in-memory backend

Note

Full source located at dump_memory_backend.

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6self_dir = os.path.abspath(os.path.dirname(__file__))
 7top_dir = os.path.abspath(
 8    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 9)
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13from taskflow import engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import task
16
17# INTRO: in this example we create a dummy flow with a dummy task, and run
18# it using a in-memory backend and pre/post run we dump out the contents
19# of the in-memory backends tree structure (which can be quite useful to
20# look at for debugging or other analysis).
21
22
23class PrintTask(task.Task):
24    def execute(self):
25        print("Running '%s'" % self.name)
26
27
28# Make a little flow and run it...
29f = lf.Flow('root')
30for alpha in ['a', 'b', 'c']:
31    f.add(PrintTask(alpha))
32
33e = engines.load(f)
34e.compile()
35e.prepare()
36
37# After prepare the storage layer + backend can now be accessed safely...
38backend = e.storage.backend
39
40print("----------")
41print("Before run")
42print("----------")
43print(backend.memory.pformat())
44print("----------")
45
46e.run()
47
48print("---------")
49print("After run")
50print("---------")
51for path in backend.memory.ls_r(backend.memory.root_path, absolute=True):
52    value = backend.memory[path]
53    if value:
54        print(f"{path} -> {value}")
55    else:
56        print("%s" % (path))

Making phone calls

Note

Full source located at simple_linear.

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(
 7    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 8)
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14
15# INTRO: In this example we create two tasks, each of which ~calls~ a given
16# ~phone~ number (provided as a function input) in a linear fashion (one after
17# the other). For a workflow which is serial this shows a extremely simple way
18# of structuring your tasks (the code that does the work) into a linear
19# sequence (the flow) and then passing the work off to an engine, with some
20# initial data to be ran in a reliable manner.
21#
22# NOTE(harlowja): This example shows a basic usage of the taskflow structures
23# without involving the complexity of persistence. Using the structures that
24# taskflow provides via tasks and flows makes it possible for you to easily at
25# a later time hook in a persistence layer (and then gain the functionality
26# that offers) when you decide the complexity of adding that layer in
27# is 'worth it' for your application's usage pattern (which certain
28# applications may not need).
29
30
31class CallJim(task.Task):
32    def execute(self, jim_number, *args, **kwargs):
33        print("Calling jim %s." % jim_number)
34
35
36class CallJoe(task.Task):
37    def execute(self, joe_number, *args, **kwargs):
38        print("Calling joe %s." % joe_number)
39
40
41# Create your flow and associated tasks (the work to be done).
42flow = lf.Flow('simple-linear').add(CallJim(), CallJoe())
43
44# Now run that flow using the provided initial data (store below).
45taskflow.engines.run(flow, store=dict(joe_number=444, jim_number=555))

Making phone calls (automatically reverting)

Note

Full source located at reverting_linear.

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(
 7    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 8)
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14
15# INTRO: In this example we create three tasks, each of which ~calls~ a given
16# number (provided as a function input), one of those tasks *fails* calling a
17# given number (the suzzie calling); this causes the workflow to enter the
18# reverting process, which activates the revert methods of the previous two
19# phone ~calls~.
20#
21# This simulated calling makes it appear like all three calls occur or all
22# three don't occur (transaction-like capabilities). No persistence layer is
23# used here so reverting and executing will *not* be tolerant of process
24# failure.
25
26
27class CallJim(task.Task):
28    def execute(self, jim_number, *args, **kwargs):
29        print("Calling jim %s." % jim_number)
30
31    def revert(self, jim_number, *args, **kwargs):
32        print("Calling %s and apologizing." % jim_number)
33
34
35class CallJoe(task.Task):
36    def execute(self, joe_number, *args, **kwargs):
37        print("Calling joe %s." % joe_number)
38
39    def revert(self, joe_number, *args, **kwargs):
40        print("Calling %s and apologizing." % joe_number)
41
42
43class CallSuzzie(task.Task):
44    def execute(self, suzzie_number, *args, **kwargs):
45        raise OSError("Suzzie not home right now.")
46
47
48# Create your flow and associated tasks (the work to be done).
49flow = lf.Flow('simple-linear').add(CallJim(), CallJoe(), CallSuzzie())
50
51try:
52    # Now run that flow using the provided initial data (store below).
53    taskflow.engines.run(
54        flow, store=dict(joe_number=444, jim_number=555, suzzie_number=666)
55    )
56except Exception as e:
57    # NOTE(harlowja): This exception will be the exception that came out of the
58    # 'CallSuzzie' task instead of a different exception, this is useful since
59    # typically surrounding code wants to handle the original exception and not
60    # a wrapped or altered one.
61    #
62    # *WARNING* If this flow was multi-threaded and multiple active tasks threw
63    # exceptions then the above exception would be wrapped into a combined
64    # exception (the object has methods to iterate over the contained
65    # exceptions). See: exceptions.py and the class 'WrappedFailure' to look at
66    # how to deal with multiple tasks failing while running.
67    #
68    # You will also note that this is not a problem in this case since no
69    # parallelism is involved; this is ensured by the usage of a linear flow
70    # and the default engine type which is 'serial' vs being 'parallel'.
71    print("Flow failed: %s" % e)

Building a car

Note

Full source located at build_a_car.

  1import os
  2import sys
  3
  4
  5logging.basicConfig(level=logging.ERROR)
  6
  7top_dir = os.path.abspath(
  8    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
  9)
 10sys.path.insert(0, top_dir)
 11
 12
 13import taskflow.engines
 14from taskflow.patterns import graph_flow as gf
 15from taskflow.patterns import linear_flow as lf
 16from taskflow import task
 17from taskflow.types import notifier
 18
 19ANY = notifier.Notifier.ANY
 20
 21import example_utils as eu  # noqa
 22
 23
 24# INTRO: This example shows how a graph flow and linear flow can be used
 25# together to execute dependent & non-dependent tasks by going through the
 26# steps required to build a simplistic car (an assembly line if you will). It
 27# also shows how raw functions can be wrapped into a task object instead of
 28# being forced to use the more *heavy* task base class. This is useful in
 29# scenarios where pre-existing code has functions that you easily want to
 30# plug-in to taskflow, without requiring a large amount of code changes.
 31
 32
 33def build_frame():
 34    return 'steel'
 35
 36
 37def build_engine():
 38    return 'honda'
 39
 40
 41def build_doors():
 42    return '2'
 43
 44
 45def build_wheels():
 46    return '4'
 47
 48
 49# These just return true to indiciate success, they would in the real work
 50# do more than just that.
 51
 52
 53def install_engine(frame, engine):
 54    return True
 55
 56
 57def install_doors(frame, windows_installed, doors):
 58    return True
 59
 60
 61def install_windows(frame, doors):
 62    return True
 63
 64
 65def install_wheels(frame, engine, engine_installed, wheels):
 66    return True
 67
 68
 69def trash(**kwargs):
 70    eu.print_wrapped("Throwing away pieces of car!")
 71
 72
 73def startup(**kwargs):
 74    # If you want to see the rollback function being activated try uncommenting
 75    # the following line.
 76    #
 77    # raise ValueError("Car not verified")
 78    return True
 79
 80
 81def verify(spec, **kwargs):
 82    # If the car is not what we ordered throw away the car (trigger reversion).
 83    for key, value in kwargs.items():
 84        if spec[key] != value:
 85            raise Exception("Car doesn't match spec!")
 86    return True
 87
 88
 89# These two functions connect into the state transition notification emission
 90# points that the engine outputs, they can be used to log state transitions
 91# that are occurring, or they can be used to suspend the engine (or perform
 92# other useful activities).
 93def flow_watch(state, details):
 94    print('Flow => %s' % state)
 95
 96
 97def task_watch(state, details):
 98    print('Task {} => {}'.format(details.get('task_name'), state))
 99
100
101flow = lf.Flow("make-auto").add(
102    task.FunctorTask(startup, revert=trash, provides='ran'),
103    # A graph flow allows automatic dependency based ordering, the ordering
104    # is determined by analyzing the symbols required and provided and ordering
105    # execution based on a functioning order (if one exists).
106    gf.Flow("install-parts").add(
107        task.FunctorTask(build_frame, provides='frame'),
108        task.FunctorTask(build_engine, provides='engine'),
109        task.FunctorTask(build_doors, provides='doors'),
110        task.FunctorTask(build_wheels, provides='wheels'),
111        # These *_installed outputs allow for other tasks to depend on certain
112        # actions being performed (aka the components were installed), another
113        # way to do this is to link() the tasks manually instead of creating
114        # an 'artificial' data dependency that accomplishes the same goal the
115        # manual linking would result in.
116        task.FunctorTask(install_engine, provides='engine_installed'),
117        task.FunctorTask(install_doors, provides='doors_installed'),
118        task.FunctorTask(install_windows, provides='windows_installed'),
119        task.FunctorTask(install_wheels, provides='wheels_installed'),
120    ),
121    task.FunctorTask(
122        verify,
123        requires=[
124            'frame',
125            'engine',
126            'doors',
127            'wheels',
128            'engine_installed',
129            'doors_installed',
130            'windows_installed',
131            'wheels_installed',
132        ],
133    ),
134)
135
136# This dictionary will be provided to the tasks as a specification for what
137# the tasks should produce, in this example this specification will influence
138# what those tasks do and what output they create. Different tasks depend on
139# different information from this specification, all of which will be provided
140# automatically by the engine to those tasks.
141spec = {
142    "frame": 'steel',
143    "engine": 'honda',
144    "doors": '2',
145    "wheels": '4',
146    # These are used to compare the result product, a car without the pieces
147    # installed is not a car after all.
148    "engine_installed": True,
149    "doors_installed": True,
150    "windows_installed": True,
151    "wheels_installed": True,
152}
153
154
155engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
156
157# This registers all (ANY) state transitions to trigger a call to the
158# flow_watch function for flow state transitions, and registers the
159# same all (ANY) state transitions for task state transitions.
160engine.notifier.register(ANY, flow_watch)
161engine.atom_notifier.register(ANY, task_watch)
162
163eu.print_wrapped("Building a car")
164engine.run()
165
166# Alter the specification and ensure that the reverting logic gets triggered
167# since the resultant car that will be built by the build_wheels function will
168# build a car with 4 doors only (not 5), this will cause the verification
169# task to mark the car that is produced as not matching the desired spec.
170spec['doors'] = 5
171
172engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
173engine.notifier.register(ANY, flow_watch)
174engine.atom_notifier.register(ANY, task_watch)
175
176eu.print_wrapped("Building a wrong car that doesn't match specification")
177try:
178    engine.run()
179except Exception as e:
180    eu.print_wrapped("Flow failed: %s" % e)

Iterating over the alphabet (using processes)

Note

Full source located at alphabet_soup.

 1import functools
 2import logging
 3import os
 4import string
 5import sys
 6import time
 7
 8logging.basicConfig(level=logging.ERROR)
 9
10self_dir = os.path.abspath(os.path.dirname(__file__))
11top_dir = os.path.abspath(
12    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
13)
14sys.path.insert(0, top_dir)
15sys.path.insert(0, self_dir)
16
17from taskflow import engines
18from taskflow import exceptions
19from taskflow.patterns import linear_flow
20from taskflow import task
21
22
23# In this example we show how a simple linear set of tasks can be executed
24# using local processes (and not threads or remote workers) with minimal (if
25# any) modification to those tasks to make them safe to run in this mode.
26#
27# This is useful since it allows further scaling up your workflows when thread
28# execution starts to become a bottleneck (which it can start to be due to the
29# GIL in python). It also offers a intermediary scalable runner that can be
30# used when the scale and/or setup of remote workers is not desirable.
31
32
33def progress_printer(task, event_type, details):
34    # This callback, attached to each task will be called in the local
35    # process (not the child processes)...
36    progress = details.pop('progress')
37    progress = int(progress * 100.0)
38    print("Task '%s' reached %d%% completion" % (task.name, progress))
39
40
41class AlphabetTask(task.Task):
42    # Second delay between each progress part.
43    _DELAY = 0.1
44
45    # This task will run in X main stages (each with a different progress
46    # report that will be delivered back to the running process...). The
47    # initial 0% and 100% are triggered automatically by the engine when
48    # a task is started and finished (so that's why those are not emitted
49    # here).
50    _PROGRESS_PARTS = [fractions.Fraction("%s/5" % x) for x in range(1, 5)]
51
52    def execute(self):
53        for p in self._PROGRESS_PARTS:
54            self.update_progress(p)
55            time.sleep(self._DELAY)
56
57
58print("Constructing...")
59soup = linear_flow.Flow("alphabet-soup")
60for letter in string.ascii_lowercase:
61    abc = AlphabetTask(letter)
62    abc.notifier.register(
63        task.EVENT_UPDATE_PROGRESS, functools.partial(progress_printer, abc)
64    )
65    soup.add(abc)
66try:
67    print("Loading...")
68    e = engines.load(soup, engine='parallel', executor='processes')
69    print("Compiling...")
70    e.compile()
71    print("Preparing...")
72    e.prepare()
73    print("Running...")
74    e.run()
75    print("Done: %s" % e.statistics)
76except exceptions.NotImplementedError as e:
77    print(e)

Watching execution timing

Note

Full source located at timing_listener.

 1import os
 2import random
 3import sys
 4import time
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8top_dir = os.path.abspath(
 9    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
10)
11sys.path.insert(0, top_dir)
12
13from taskflow import engines
14from taskflow.listeners import timing
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17
18# INTRO: in this example we will attach a listener to an engine
19# and have variable run time tasks run and show how the listener will print
20# out how long those tasks took (when they started and when they finished).
21#
22# This shows how timing metrics can be gathered (or attached onto an engine)
23# after a workflow has been constructed, making it easy to gather metrics
24# dynamically for situations where this kind of information is applicable (or
25# even adding this information on at a later point in the future when your
26# application starts to slow down).
27
28
29class VariableTask(task.Task):
30    def __init__(self, name):
31        super().__init__(name)
32        self._sleepy_time = random.random()
33
34    def execute(self):
35        time.sleep(self._sleepy_time)
36
37
38f = lf.Flow('root')
39f.add(VariableTask('a'), VariableTask('b'), VariableTask('c'))
40e = engines.load(f)
41with timing.PrintingDurationListener(e):
42    e.run()

Distance calculator

Note

Full source located at distance_calculator

  1import math
  2import os
  3import sys
  4
  5top_dir = os.path.abspath(
  6    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
  7)
  8sys.path.insert(0, top_dir)
  9
 10from taskflow import engines
 11from taskflow.patterns import linear_flow
 12from taskflow import task
 13
 14# INTRO: This shows how to use a tasks/atoms ability to take requirements from
 15# its execute functions default parameters and shows how to provide those
 16# via different methods when needed, to influence those parameters to in
 17# this case calculate the distance between two points in 2D space.
 18
 19# A 2D point.
 20Point = collections.namedtuple("Point", "x,y")
 21
 22
 23def is_near(val, expected, tolerance=0.001):
 24    # Floats don't really provide equality...
 25    if val > (expected + tolerance):
 26        return False
 27    if val < (expected - tolerance):
 28        return False
 29    return True
 30
 31
 32class DistanceTask(task.Task):
 33    # See: http://en.wikipedia.org/wiki/Distance#Distance_in_Euclidean_space
 34
 35    default_provides = 'distance'
 36
 37    def execute(self, a=Point(0, 0), b=Point(0, 0)):
 38        return math.sqrt(math.pow(b.x - a.x, 2) + math.pow(b.y - a.y, 2))
 39
 40
 41if __name__ == '__main__':
 42    # For these we rely on the execute() methods points by default being
 43    # at the origin (and we override it with store values when we want) at
 44    # execution time (which then influences what is calculated).
 45    any_distance = linear_flow.Flow("origin").add(DistanceTask())
 46    results = engines.run(any_distance)
 47    print(results)
 48    print(
 49        "{} is near-enough to {}: {}".format(
 50            results['distance'], 0.0, is_near(results['distance'], 0.0)
 51        )
 52    )
 53
 54    results = engines.run(any_distance, store={'a': Point(1, 1)})
 55    print(results)
 56    print(
 57        "{} is near-enough to {}: {}".format(
 58            results['distance'], 1.4142, is_near(results['distance'], 1.4142)
 59        )
 60    )
 61
 62    results = engines.run(any_distance, store={'a': Point(10, 10)})
 63    print(results)
 64    print(
 65        "{} is near-enough to {}: {}".format(
 66            results['distance'],
 67            14.14199,
 68            is_near(results['distance'], 14.14199),
 69        )
 70    )
 71
 72    results = engines.run(
 73        any_distance, store={'a': Point(5, 5), 'b': Point(10, 10)}
 74    )
 75    print(results)
 76    print(
 77        "{} is near-enough to {}: {}".format(
 78            results['distance'], 7.07106, is_near(results['distance'], 7.07106)
 79        )
 80    )
 81
 82    # For this we use the ability to override at task creation time the
 83    # optional arguments so that we don't need to continue to send them
 84    # in via the 'store' argument like in the above (and we fix the new
 85    # starting point 'a' at (10, 10) instead of (0, 0)...
 86
 87    ten_distance = linear_flow.Flow("ten")
 88    ten_distance.add(DistanceTask(inject={'a': Point(10, 10)}))
 89    results = engines.run(ten_distance, store={'b': Point(10, 10)})
 90    print(results)
 91    print(
 92        "{} is near-enough to {}: {}".format(
 93            results['distance'], 0.0, is_near(results['distance'], 0.0)
 94        )
 95    )
 96
 97    results = engines.run(ten_distance)
 98    print(results)
 99    print(
100        "{} is near-enough to {}: {}".format(
101            results['distance'],
102            14.14199,
103            is_near(results['distance'], 14.14199),
104        )
105    )

Table multiplier (in parallel)

Note

Full source located at parallel_table_multiply

  1import logging
  2import os
  3import random
  4import sys
  5
  6logging.basicConfig(level=logging.ERROR)
  7
  8top_dir = os.path.abspath(
  9    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 10)
 11sys.path.insert(0, top_dir)
 12
 13import futurist
 14
 15from taskflow import engines
 16from taskflow.patterns import unordered_flow as uf
 17from taskflow import task
 18
 19# INTRO: This example walks through a miniature workflow which does a parallel
 20# table modification where each row in the table gets adjusted by a thread, or
 21# green thread (if eventlet is available) in parallel and then the result
 22# is reformed into a new table and some verifications are performed on it
 23# to ensure everything went as expected.
 24
 25
 26MULTIPLER = 10
 27
 28
 29class RowMultiplier(task.Task):
 30    """Performs a modification of an input row, creating a output row."""
 31
 32    def __init__(self, name, index, row, multiplier):
 33        super().__init__(name=name)
 34        self.index = index
 35        self.multiplier = multiplier
 36        self.row = row
 37
 38    def execute(self):
 39        return [r * self.multiplier for r in self.row]
 40
 41
 42def make_flow(table):
 43    # This creation will allow for parallel computation (since the flow here
 44    # is specifically unordered; and when things are unordered they have
 45    # no dependencies and when things have no dependencies they can just be
 46    # ran at the same time, limited in concurrency by the executor or max
 47    # workers of that executor...)
 48    f = uf.Flow("root")
 49    for i, row in enumerate(table):
 50        f.add(RowMultiplier("m-%s" % i, i, row, MULTIPLER))
 51    # NOTE(harlowja): at this point nothing has ran, the above is just
 52    # defining what should be done (but not actually doing it) and associating
 53    # an ordering dependencies that should be enforced (the flow pattern used
 54    # forces this), the engine in the later main() function will actually
 55    # perform this work...
 56    return f
 57
 58
 59def main():
 60    if len(sys.argv) == 2:
 61        tbl = []
 62        with open(sys.argv[1], 'rb') as fh:
 63            reader = csv.reader(fh)
 64            for row in reader:
 65                tbl.append([float(r) if r else 0.0 for r in row])
 66    else:
 67        # Make some random table out of thin air...
 68        tbl = []
 69        cols = random.randint(1, 100)
 70        rows = random.randint(1, 100)
 71        for _i in range(0, rows):
 72            row = []
 73            for _j in range(0, cols):
 74                row.append(random.random())
 75            tbl.append(row)
 76
 77    # Generate the work to be done.
 78    f = make_flow(tbl)
 79
 80    # Now run it (using the specified executor)...
 81    try:
 82        executor = futurist.GreenThreadPoolExecutor(max_workers=5)
 83    except RuntimeError:
 84        # No eventlet currently active, use real threads instead.
 85        executor = futurist.ThreadPoolExecutor(max_workers=5)
 86    try:
 87        e = engines.load(f, engine='parallel', executor=executor)
 88        for st in e.run_iter():
 89            print(st)
 90    finally:
 91        executor.shutdown()
 92
 93    # Find the old rows and put them into place...
 94    #
 95    # TODO(harlowja): probably easier just to sort instead of search...
 96    computed_tbl = []
 97    for i in range(0, len(tbl)):
 98        for t in f:
 99            if t.index == i:
100                computed_tbl.append(e.storage.get(t.name))
101
102    # Do some basic validation (which causes the return code of this process
103    # to be different if things were not as expected...)
104    if len(computed_tbl) != len(tbl):
105        return 1
106    else:
107        return 0
108
109
110if __name__ == "__main__":
111    sys.exit(main())

Linear equation solver (explicit dependencies)

Note

Full source located at calculate_linear.

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(
 7    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 8)
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14
15
16# INTRO: In this example a linear flow is used to group four tasks to calculate
17# a value. A single added task is used twice, showing how this can be done
18# and the twice added task takes in different bound values. In the first case
19# it uses default parameters ('x' and 'y') and in the second case arguments
20# are bound with ('z', 'd') keys from the engines internal storage mechanism.
21#
22# A multiplier task uses a binding that another task also provides, but this
23# example explicitly shows that 'z' parameter is bound with 'a' key
24# This shows that if a task depends on a key named the same as a key provided
25# from another task the name can be remapped to take the desired key from a
26# different origin.
27
28
29# This task provides some values from as a result of execution, this can be
30# useful when you want to provide values from a static set to other tasks that
31# depend on those values existing before those tasks can run.
32#
33# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
34# that just provides those values on engine running by prepopulating the
35# storage backend before your tasks are ran (which accomplishes a similar goal
36# in a more uniform manner).
37class Provider(task.Task):
38    def __init__(self, name, *args, **kwargs):
39        super().__init__(name=name, **kwargs)
40        self._provide = args
41
42    def execute(self):
43        return self._provide
44
45
46# This task adds two input variables and returns the result.
47#
48# Note that since this task does not have a revert() function (since addition
49# is a stateless operation) there are no side-effects that this function needs
50# to undo if some later operation fails.
51class Adder(task.Task):
52    def execute(self, x, y):
53        return x + y
54
55
56# This task multiplies an input variable by a multiplier and returns the
57# result.
58#
59# Note that since this task does not have a revert() function (since
60# multiplication is a stateless operation) and there are no side-effects that
61# this function needs to undo if some later operation fails.
62class Multiplier(task.Task):
63    def __init__(self, name, multiplier, provides=None, rebind=None):
64        super().__init__(name=name, provides=provides, rebind=rebind)
65        self._multiplier = multiplier
66
67    def execute(self, z):
68        return z * self._multiplier
69
70
71# Note here that the ordering is established so that the correct sequences
72# of operations occurs where the adding and multiplying is done according
73# to the expected and typical mathematical model. A graph flow could also be
74# used here to automatically infer & ensure the correct ordering.
75flow = lf.Flow('root').add(
76    # Provide the initial values for other tasks to depend on.
77    #
78    # x = 2, y = 3, d = 5
79    Provider("provide-adder", 2, 3, 5, provides=('x', 'y', 'd')),
80    # z = x+y = 5
81    Adder("add-1", provides='z'),
82    # a = z+d = 10
83    Adder("add-2", provides='a', rebind=['z', 'd']),
84    # Calculate 'r = a*3 = 30'
85    #
86    # Note here that the 'z' argument of the execute() function will not be
87    # bound to the 'z' variable provided from the above 'provider' object but
88    # instead the 'z' argument will be taken from the 'a' variable provided
89    # by the second add-2 listed above.
90    Multiplier("multi", 3, provides='r', rebind={'z': 'a'}),
91)
92
93# The result here will be all results (from all tasks) which is stored in an
94# in-memory storage location that backs this engine since it is not configured
95# with persistence storage.
96results = taskflow.engines.run(flow)
97print(results)

Linear equation solver (inferred dependencies)

Source: graph_flow.py

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(
 7    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 8)
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import graph_flow as gf
13from taskflow.patterns import linear_flow as lf
14from taskflow import task
15
16
17# In this example there are complex *inferred* dependencies between tasks that
18# are used to perform a simple set of linear equations.
19#
20# As you will see below the tasks just define what they require as input
21# and produce as output (named values). Then the user doesn't care about
22# ordering the tasks (in this case the tasks calculate pieces of the overall
23# equation).
24#
25# As you will notice a graph flow resolves dependencies automatically using the
26# tasks symbol requirements and provided symbol values and no orderin
27# dependency has to be manually created.
28#
29# Also notice that flows of any types can be nested into a graph flow; showing
30# that subflow dependencies (and associated ordering) will be inferred too.
31
32
33class Adder(task.Task):
34    def execute(self, x, y):
35        return x + y
36
37
38flow = gf.Flow('root').add(
39    lf.Flow('nested_linear').add(
40        # x2 = y3+y4 = 12
41        Adder("add2", provides='x2', rebind=['y3', 'y4']),
42        # x1 = y1+y2 = 4
43        Adder("add1", provides='x1', rebind=['y1', 'y2']),
44    ),
45    # x5 = x1+x3 = 20
46    Adder("add5", provides='x5', rebind=['x1', 'x3']),
47    # x3 = x1+x2 = 16
48    Adder("add3", provides='x3', rebind=['x1', 'x2']),
49    # x4 = x2+y5 = 21
50    Adder("add4", provides='x4', rebind=['x2', 'y5']),
51    # x6 = x5+x4 = 41
52    Adder("add6", provides='x6', rebind=['x5', 'x4']),
53    # x7 = x6+x6 = 82
54    Adder("add7", provides='x7', rebind=['x6', 'x6']),
55)
56
57# Provide the initial variable inputs using a storage dictionary.
58store = {
59    "y1": 1,
60    "y2": 3,
61    "y3": 5,
62    "y4": 7,
63    "y5": 9,
64}
65
66# This is the expected values that should be created.
67unexpected = 0
68expected = [
69    ('x1', 4),
70    ('x2', 12),
71    ('x3', 16),
72    ('x4', 21),
73    ('x5', 20),
74    ('x6', 41),
75    ('x7', 82),
76]
77
78result = taskflow.engines.run(flow, engine='serial', store=store)
79
80print("Single threaded engine result %s" % result)
81for name, value in expected:
82    actual = result.get(name)
83    if actual != value:
84        sys.stderr.write(f"{actual} != {value}\n")
85        unexpected += 1
86
87result = taskflow.engines.run(flow, engine='parallel', store=store)
88
89print("Multi threaded engine result %s" % result)
90for name, value in expected:
91    actual = result.get(name)
92    if actual != value:
93        sys.stderr.write(f"{actual} != {value}\n")
94        unexpected += 1
95
96if unexpected:
97    sys.exit(1)

Linear equation solver (in parallel)

Note

Full source located at calculate_in_parallel

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(
 7    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 8)
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow.patterns import unordered_flow as uf
14from taskflow import task
15
16# INTRO: These examples show how a linear flow and an unordered flow can be
17# used together to execute calculations in parallel and then use the
18# result for the next task/s. The adder task is used for all calculations
19# and argument bindings are used to set correct parameters for each task.
20
21
22# This task provides some values from as a result of execution, this can be
23# useful when you want to provide values from a static set to other tasks that
24# depend on those values existing before those tasks can run.
25#
26# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
27# that provides those values on engine running by prepopulating the storage
28# backend before your tasks are ran (which accomplishes a similar goal in a
29# more uniform manner).
30class Provider(task.Task):
31    def __init__(self, name, *args, **kwargs):
32        super().__init__(name=name, **kwargs)
33        self._provide = args
34
35    def execute(self):
36        return self._provide
37
38
39# This task adds two input variables and returns the result of that addition.
40#
41# Note that since this task does not have a revert() function (since addition
42# is a stateless operation) there are no side-effects that this function needs
43# to undo if some later operation fails.
44class Adder(task.Task):
45    def execute(self, x, y):
46        return x + y
47
48
49flow = lf.Flow('root').add(
50    # Provide the initial values for other tasks to depend on.
51    #
52    # x1 = 2, y1 = 3, x2 = 5, x3 = 8
53    Provider("provide-adder", 2, 3, 5, 8, provides=('x1', 'y1', 'x2', 'y2')),
54    # Note here that we define the flow that contains the 2 adders to be an
55    # unordered flow since the order in which these execute does not matter,
56    # another way to solve this would be to use a graph_flow pattern, which
57    # also can run in parallel (since they have no ordering dependencies).
58    uf.Flow('adders').add(
59        # Calculate 'z1 = x1+y1 = 5'
60        #
61        # Rebind here means that the execute() function x argument will be
62        # satisfied from a previous output named 'x1', and the y argument
63        # of execute() will be populated from the previous output named 'y1'
64        #
65        # The output (result of adding) will be mapped into a variable named
66        # 'z1' which can then be refereed to and depended on by other tasks.
67        Adder(name="add", provides='z1', rebind=['x1', 'y1']),
68        # z2 = x2+y2 = 13
69        Adder(name="add-2", provides='z2', rebind=['x2', 'y2']),
70    ),
71    # r = z1+z2 = 18
72    Adder(name="sum-1", provides='r', rebind=['z1', 'z2']),
73)
74
75
76# The result here will be all results (from all tasks) which is stored in an
77# in-memory storage location that backs this engine since it is not configured
78# with persistence storage.
79result = taskflow.engines.run(flow, engine='parallel')
80print(result)

Creating a volume (in parallel)

Note

Full source located at create_parallel_volume

 1import logging
 2import os
 3import random
 4import sys
 5import time
 6
 7logging.basicConfig(level=logging.ERROR)
 8
 9top_dir = os.path.abspath(
10    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
11)
12sys.path.insert(0, top_dir)
13
14from oslo_utils import reflection
15
16from taskflow import engines
17from taskflow.listeners import printing
18from taskflow.patterns import unordered_flow as uf
19from taskflow import task
20
21# INTRO: These examples show how unordered_flow can be used to create a large
22# number of fake volumes in parallel (or serially, depending on a constant that
23# can be easily changed).
24
25
26@contextlib.contextmanager
27def show_time(name):
28    start = time.time()
29    yield
30    end = time.time()
31    print(f" -- {name} took {end - start:0.3f} seconds")
32
33
34# This affects how many volumes to create and how much time to *simulate*
35# passing for that volume to be created.
36MAX_CREATE_TIME = 3
37VOLUME_COUNT = 5
38
39# This will be used to determine if all the volumes are created in parallel
40# or whether the volumes are created serially (in an undefined ordered since
41# a unordered flow is used). Note that there is a disconnection between the
42# ordering and the concept of parallelism (since unordered items can still be
43# ran in a serial ordering). A typical use-case for offering both is to allow
44# for debugging using a serial approach, while when running at a larger scale
45# one would likely want to use the parallel approach.
46#
47# If you switch this flag from serial to parallel you can see the overall
48# time difference that this causes.
49SERIAL = False
50if SERIAL:
51    engine = 'serial'
52else:
53    engine = 'parallel'
54
55
56class VolumeCreator(task.Task):
57    def __init__(self, volume_id):
58        # Note here that the volume name is composed of the name of the class
59        # along with the volume id that is being created, since a name of a
60        # task uniquely identifies that task in storage it is important that
61        # the name be relevant and identifiable if the task is recreated for
62        # subsequent resumption (if applicable).
63        #
64        # UUIDs are *not* used as they can not be tied back to a previous tasks
65        # state on resumption (since they are unique and will vary for each
66        # task that is created). A name based off the volume id that is to be
67        # created is more easily tied back to the original task so that the
68        # volume create can be resumed/revert, and is much easier to use for
69        # audit and tracking purposes.
70        base_name = reflection.get_callable_name(self)
71        super().__init__(name=f"{base_name}-{volume_id}")
72        self._volume_id = volume_id
73
74    def execute(self):
75        print("Making volume %s" % (self._volume_id))
76        time.sleep(random.random() * MAX_CREATE_TIME)
77        print("Finished making volume %s" % (self._volume_id))
78
79
80# Assume there is no ordering dependency between volumes.
81flow = uf.Flow("volume-maker")
82for i in range(0, VOLUME_COUNT):
83    flow.add(VolumeCreator(volume_id="vol-%s" % (i)))
84
85
86# Show how much time the overall engine loading and running takes.
87with show_time(name=flow.name.title()):
88    eng = engines.load(flow, engine=engine)
89    # This context manager automatically adds (and automatically removes) a
90    # helpful set of state transition notification printing helper utilities
91    # that show you exactly what transitions the engine is going through
92    # while running the various volume create tasks.
93    with printing.PrintingListener(eng):
94        eng.run()

Summation mapper(s) and reducer (in parallel)

Note

Full source located at simple_map_reduce

  1import os
  2import sys
  3
  4logging.basicConfig(level=logging.ERROR)
  5
  6self_dir = os.path.abspath(os.path.dirname(__file__))
  7top_dir = os.path.abspath(
  8    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
  9)
 10sys.path.insert(0, top_dir)
 11sys.path.insert(0, self_dir)
 12
 13# INTRO: These examples show a simplistic map/reduce implementation where
 14# a set of mapper(s) will sum a series of input numbers (in parallel) and
 15# return their individual summed result. A reducer will then use those
 16# produced values and perform a final summation and this result will then be
 17# printed (and verified to ensure the calculation was as expected).
 18
 19from taskflow import engines
 20from taskflow.patterns import linear_flow
 21from taskflow.patterns import unordered_flow
 22from taskflow import task
 23
 24
 25class SumMapper(task.Task):
 26    def execute(self, inputs):
 27        # Sums some set of provided inputs.
 28        return sum(inputs)
 29
 30
 31class TotalReducer(task.Task):
 32    def execute(self, *args, **kwargs):
 33        # Reduces all mapped summed outputs into a single value.
 34        total = 0
 35        for k, v in kwargs.items():
 36            # If any other kwargs was passed in, we don't want to use those
 37            # in the calculation of the total...
 38            if k.startswith('reduction_'):
 39                total += v
 40        return total
 41
 42
 43def chunk_iter(chunk_size, upperbound):
 44    """Yields back chunk size pieces from zero to upperbound - 1."""
 45    chunk = []
 46    for i in range(0, upperbound):
 47        chunk.append(i)
 48        if len(chunk) == chunk_size:
 49            yield chunk
 50            chunk = []
 51
 52
 53# Upper bound of numbers to sum for example purposes...
 54UPPER_BOUND = 10000
 55
 56# How many mappers we want to have.
 57SPLIT = 10
 58
 59# How big of a chunk we want to give each mapper.
 60CHUNK_SIZE = UPPER_BOUND // SPLIT
 61
 62# This will be the workflow we will compose and run.
 63w = linear_flow.Flow("root")
 64
 65# The mappers will run in parallel.
 66store = {}
 67provided = []
 68mappers = unordered_flow.Flow('map')
 69for i, chunk in enumerate(chunk_iter(CHUNK_SIZE, UPPER_BOUND)):
 70    mapper_name = 'mapper_%s' % i
 71    # Give that mapper some information to compute.
 72    store[mapper_name] = chunk
 73    # The reducer uses all of the outputs of the mappers, so it needs
 74    # to be recorded that it needs access to them (under a specific name).
 75    provided.append("reduction_%s" % i)
 76    mappers.add(
 77        SumMapper(
 78            name=mapper_name,
 79            rebind={'inputs': mapper_name},
 80            provides=provided[-1],
 81        )
 82    )
 83w.add(mappers)
 84
 85# The reducer will run last (after all the mappers).
 86w.add(TotalReducer('reducer', requires=provided))
 87
 88# Now go!
 89e = engines.load(w, engine='parallel', store=store, max_workers=4)
 90print("Running a parallel engine with options: %s" % e.options)
 91e.run()
 92
 93# Now get the result the reducer created.
 94total = e.storage.get('reducer')
 95print("Calculated result = %s" % total)
 96
 97# Calculate it manually to verify that it worked...
 98calc_total = sum(range(0, UPPER_BOUND))
 99if calc_total != total:
100    sys.exit(1)

Sharing a thread pool executor (in parallel)

Note

Full source located at share_engine_thread

 1import os
 2import random
 3import sys
 4import time
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8top_dir = os.path.abspath(
 9    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
10)
11sys.path.insert(0, top_dir)
12
13import futurist
14
15from taskflow import engines
16from taskflow.patterns import unordered_flow as uf
17from taskflow import task
18from taskflow.utils import threading_utils as tu
19
20# INTRO: in this example we create 2 dummy flow(s) with a 2 dummy task(s), and
21# run it using a shared thread pool executor to show how a single executor can
22# be used with more than one engine (sharing the execution thread pool between
23# them); this allows for saving resources and reusing threads in situations
24# where this is benefical.
25
26
27class DelayedTask(task.Task):
28    def __init__(self, name):
29        super().__init__(name=name)
30        self._wait_for = random.random()
31
32    def execute(self):
33        print(f"Running '{self.name}' in thread '{tu.get_ident()}'")
34        time.sleep(self._wait_for)
35
36
37f1 = uf.Flow("f1")
38f1.add(DelayedTask("f1-1"))
39f1.add(DelayedTask("f1-2"))
40
41f2 = uf.Flow("f2")
42f2.add(DelayedTask("f2-1"))
43f2.add(DelayedTask("f2-2"))
44
45# Run them all using the same futures (thread-pool based) executor...
46with futurist.ThreadPoolExecutor() as ex:
47    e1 = engines.load(f1, engine='parallel', executor=ex)
48    e2 = engines.load(f2, engine='parallel', executor=ex)
49    iters = [e1.run_iter(), e2.run_iter()]
50    # Iterate over a copy (so we can remove from the source list).
51    cloned_iters = list(iters)
52    while iters:
53        # Run a single 'step' of each iterator, forcing each engine to perform
54        # some work, then yield, and repeat until each iterator is consumed
55        # and there is no more engine work to be done.
56        for it in cloned_iters:
57            try:
58                next(it)
59            except StopIteration:
60                try:
61                    iters.remove(it)
62                except ValueError:
63                    pass

Storing & emitting a bill

Note

Full source located at fake_billing

  1import logging
  2import os
  3import sys
  4import time
  5
  6logging.basicConfig(level=logging.ERROR)
  7
  8top_dir = os.path.abspath(
  9    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 10)
 11sys.path.insert(0, top_dir)
 12
 13from oslo_utils import uuidutils
 14
 15from taskflow import engines
 16from taskflow.listeners import printing
 17from taskflow.patterns import graph_flow as gf
 18from taskflow.patterns import linear_flow as lf
 19from taskflow import task
 20from taskflow.utils import misc
 21
 22# INTRO: This example walks through a miniature workflow which simulates
 23# the reception of an API request, creation of a database entry, driver
 24# activation (which invokes a 'fake' webservice) and final completion.
 25#
 26# This example also shows how a function/object (in this class the url sending)
 27# that occurs during driver activation can update the progress of a task
 28# without being aware of the internals of how to do this by associating a
 29# callback that the url sending can update as the sending progresses from 0.0%
 30# complete to 100% complete.
 31
 32
 33class DB:
 34    def query(self, sql):
 35        print("Querying with: %s" % (sql))
 36
 37
 38class UrlCaller:
 39    def __init__(self):
 40        self._send_time = 0.5
 41        self._chunks = 25
 42
 43    def send(self, url, data, status_cb=None):
 44        sleep_time = float(self._send_time) / self._chunks
 45        for i in range(0, len(data)):
 46            time.sleep(sleep_time)
 47            # As we send the data, each chunk we 'fake' send will progress
 48            # the sending progress that much further to 100%.
 49            if status_cb:
 50                status_cb(float(i) / len(data))
 51
 52
 53# Since engines save the output of tasks to a optional persistent storage
 54# backend resources have to be dealt with in a slightly different manner since
 55# resources are transient and can *not* be persisted (or serialized). For tasks
 56# that require access to a set of resources it is a common pattern to provide
 57# a object (in this case this object) on construction of those tasks via the
 58# task constructor.
 59class ResourceFetcher:
 60    def __init__(self):
 61        self._db_handle = None
 62        self._url_handle = None
 63
 64    @property
 65    def db_handle(self):
 66        if self._db_handle is None:
 67            self._db_handle = DB()
 68        return self._db_handle
 69
 70    @property
 71    def url_handle(self):
 72        if self._url_handle is None:
 73            self._url_handle = UrlCaller()
 74        return self._url_handle
 75
 76
 77class ExtractInputRequest(task.Task):
 78    def __init__(self, resources):
 79        super().__init__(provides="parsed_request")
 80        self._resources = resources
 81
 82    def execute(self, request):
 83        return {
 84            'user': request.user,
 85            'user_id': misc.as_int(request.id),
 86            'request_id': uuidutils.generate_uuid(),
 87        }
 88
 89
 90class MakeDBEntry(task.Task):
 91    def __init__(self, resources):
 92        super().__init__()
 93        self._resources = resources
 94
 95    def execute(self, parsed_request):
 96        db_handle = self._resources.db_handle
 97        db_handle.query("INSERT %s INTO mydb" % (parsed_request))
 98
 99    def revert(self, result, parsed_request):
100        db_handle = self._resources.db_handle
101        db_handle.query("DELETE %s FROM mydb IF EXISTS" % (parsed_request))
102
103
104class ActivateDriver(task.Task):
105    def __init__(self, resources):
106        super().__init__(provides='sent_to')
107        self._resources = resources
108        self._url = "http://blahblah.com"
109
110    def execute(self, parsed_request):
111        print("Sending billing data to %s" % (self._url))
112        url_sender = self._resources.url_handle
113        # Note that here we attach our update_progress function (which is a
114        # function that the engine also 'binds' to) to the progress function
115        # that the url sending helper class uses. This allows the task progress
116        # to be tied to the url sending progress, which is very useful for
117        # downstream systems to be aware of what a task is doing at any time.
118        url_sender.send(
119            self._url,
120            json.dumps(parsed_request),
121            status_cb=self.update_progress,
122        )
123        return self._url
124
125    def update_progress(self, progress, **kwargs):
126        # Override the parent method to also print out the status.
127        super().update_progress(progress, **kwargs)
128        print(f"{self.name} is {progress * 100:0.2f}% done")
129
130
131class DeclareSuccess(task.Task):
132    def execute(self, sent_to):
133        print("Done!")
134        print("All data processed and sent to %s" % (sent_to))
135
136
137class DummyUser:
138    def __init__(self, user, id_):
139        self.user = user
140        self.id = id_
141
142
143# Resources (db handles and similar) of course can *not* be persisted so we
144# need to make sure that we pass this resource fetcher to the tasks constructor
145# so that the tasks have access to any needed resources (the resources are
146# lazily loaded so that they are only created when they are used).
147resources = ResourceFetcher()
148flow = lf.Flow("initialize-me")
149
150# 1. First we extract the api request into a usable format.
151# 2. Then we go ahead and make a database entry for our request.
152flow.add(ExtractInputRequest(resources), MakeDBEntry(resources))
153
154# 3. Then we activate our payment method and finally declare success.
155sub_flow = gf.Flow("after-initialize")
156sub_flow.add(ActivateDriver(resources), DeclareSuccess())
157flow.add(sub_flow)
158
159# Initially populate the storage with the following request object,
160# prepopulating this allows the tasks that dependent on the 'request' variable
161# to start processing (in this case this is the ExtractInputRequest task).
162store = {
163    'request': DummyUser(user="bob", id_="1.35"),
164}
165eng = engines.load(flow, engine='serial', store=store)
166
167# This context manager automatically adds (and automatically removes) a
168# helpful set of state transition notification printing helper utilities
169# that show you exactly what transitions the engine is going through
170# while running the various billing related tasks.
171with printing.PrintingListener(eng):
172    eng.run()

Suspending a workflow & resuming

Note

Full source located at resume_from_backend

  1import logging
  2import os
  3import sys
  4
  5logging.basicConfig(level=logging.ERROR)
  6
  7self_dir = os.path.abspath(os.path.dirname(__file__))
  8top_dir = os.path.abspath(
  9    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 10)
 11sys.path.insert(0, top_dir)
 12sys.path.insert(0, self_dir)
 13
 14from oslo_utils import uuidutils
 15
 16import taskflow.engines
 17from taskflow.patterns import linear_flow as lf
 18from taskflow.persistence import models
 19from taskflow import task
 20
 21import example_utils as eu  # noqa
 22
 23# INTRO: In this example linear_flow is used to group three tasks, one which
 24# will suspend the future work the engine may do. This suspend engine is then
 25# discarded and the workflow is reloaded from the persisted data and then the
 26# workflow is resumed from where it was suspended. This allows you to see how
 27# to start an engine, have a task stop the engine from doing future work (if
 28# a multi-threaded engine is being used, then the currently active work is not
 29# preempted) and then resume the work later.
 30#
 31# Usage:
 32#
 33#   With a filesystem directory as backend
 34#
 35#     python taskflow/examples/resume_from_backend.py
 36#
 37#   With ZooKeeper as backend
 38#
 39#     python taskflow/examples/resume_from_backend.py \
 40#       zookeeper://127.0.0.1:2181/taskflow/resume_from_backend/
 41
 42
 43# UTILITY FUNCTIONS #########################################
 44
 45
 46def print_task_states(flowdetail, msg):
 47    eu.print_wrapped(msg)
 48    print(f"Flow '{flowdetail.name}' state: {flowdetail.state}")
 49    # Sort by these so that our test validation doesn't get confused by the
 50    # order in which the items in the flow detail can be in.
 51    items = sorted(
 52        (td.name, td.version, td.state, td.results) for td in flowdetail
 53    )
 54    for item in items:
 55        print(" %s==%s: %s, result=%s" % item)
 56
 57
 58def find_flow_detail(backend, lb_id, fd_id):
 59    conn = backend.get_connection()
 60    lb = conn.get_logbook(lb_id)
 61    return lb.find(fd_id)
 62
 63
 64# CREATE FLOW ###############################################
 65
 66
 67class InterruptTask(task.Task):
 68    def execute(self):
 69        # DO NOT TRY THIS AT HOME
 70        engine.suspend()
 71
 72
 73class TestTask(task.Task):
 74    def execute(self):
 75        print('executing %s' % self)
 76        return 'ok'
 77
 78
 79def flow_factory():
 80    return lf.Flow('resume from backend example').add(
 81        TestTask(name='first'),
 82        InterruptTask(name='boom'),
 83        TestTask(name='second'),
 84    )
 85
 86
 87# INITIALIZE PERSISTENCE ####################################
 88
 89with eu.get_backend() as backend:
 90    # Create a place where the persistence information will be stored.
 91    book = models.LogBook("example")
 92    flow_detail = models.FlowDetail(
 93        "resume from backend example", uuid=uuidutils.generate_uuid()
 94    )
 95    book.add(flow_detail)
 96    with contextlib.closing(backend.get_connection()) as conn:
 97        conn.save_logbook(book)
 98
 99    # CREATE AND RUN THE FLOW: FIRST ATTEMPT ####################
100
101    flow = flow_factory()
102    engine = taskflow.engines.load(
103        flow, flow_detail=flow_detail, book=book, backend=backend
104    )
105
106    print_task_states(flow_detail, "At the beginning, there is no state")
107    eu.print_wrapped("Running")
108    engine.run()
109    print_task_states(flow_detail, "After running")
110
111    # RE-CREATE, RESUME, RUN ####################################
112
113    eu.print_wrapped("Resuming and running again")
114
115    # NOTE(harlowja): reload the flow detail from backend, this will allow us
116    # to resume the flow from its suspended state, but first we need to search
117    # for the right flow details in the correct logbook where things are
118    # stored.
119    #
120    # We could avoid re-loading the engine and just do engine.run() again, but
121    # this example shows how another process may unsuspend a given flow and
122    # start it again for situations where this is useful to-do (say the process
123    # running the above flow crashes).
124    flow2 = flow_factory()
125    flow_detail_2 = find_flow_detail(backend, book.uuid, flow_detail.uuid)
126    engine2 = taskflow.engines.load(
127        flow2, flow_detail=flow_detail_2, backend=backend, book=book
128    )
129    engine2.run()
130    print_task_states(flow_detail_2, "At the end")

Creating a virtual machine (resumable)

Note

Full source located at resume_vm_boot

  1import hashlib
  2import logging
  3import os
  4import random
  5import sys
  6import time
  7
  8logging.basicConfig(level=logging.ERROR)
  9
 10self_dir = os.path.abspath(os.path.dirname(__file__))
 11top_dir = os.path.abspath(
 12    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 13)
 14sys.path.insert(0, top_dir)
 15sys.path.insert(0, self_dir)
 16
 17import futurist
 18from oslo_utils import uuidutils
 19
 20from taskflow import engines
 21from taskflow import exceptions as exc
 22from taskflow.patterns import graph_flow as gf
 23from taskflow.patterns import linear_flow as lf
 24from taskflow.persistence import models
 25from taskflow import task
 26
 27import example_utils as eu  # noqa
 28
 29# INTRO: These examples show how a hierarchy of flows can be used to create a
 30# vm in a reliable & resumable manner using taskflow + a miniature version of
 31# what nova does while booting a vm.
 32
 33
 34@contextlib.contextmanager
 35def slow_down(how_long=0.5):
 36    try:
 37        yield how_long
 38    finally:
 39        if len(sys.argv) > 1:
 40            # Only both to do this if user input provided.
 41            print("** Ctrl-c me please!!! **")
 42            time.sleep(how_long)
 43
 44
 45class PrintText(task.Task):
 46    """Just inserts some text print outs in a workflow."""
 47
 48    def __init__(self, print_what, no_slow=False):
 49        content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
 50        super().__init__(name="Print: %s" % (content_hash))
 51        self._text = print_what
 52        self._no_slow = no_slow
 53
 54    def execute(self):
 55        if self._no_slow:
 56            eu.print_wrapped(self._text)
 57        else:
 58            with slow_down():
 59                eu.print_wrapped(self._text)
 60
 61
 62class DefineVMSpec(task.Task):
 63    """Defines a vm specification to be."""
 64
 65    def __init__(self, name):
 66        super().__init__(provides='vm_spec', name=name)
 67
 68    def execute(self):
 69        return {
 70            'type': 'kvm',
 71            'disks': 2,
 72            'vcpu': 1,
 73            'ips': 1,
 74            'volumes': 3,
 75        }
 76
 77
 78class LocateImages(task.Task):
 79    """Locates where the vm images are."""
 80
 81    def __init__(self, name):
 82        super().__init__(provides='image_locations', name=name)
 83
 84    def execute(self, vm_spec):
 85        image_locations = {}
 86        for i in range(0, vm_spec['disks']):
 87            url = "http://www.yahoo.com/images/%s" % (i)
 88            image_locations[url] = "/tmp/%s.img" % (i)
 89        return image_locations
 90
 91
 92class DownloadImages(task.Task):
 93    """Downloads all the vm images."""
 94
 95    def __init__(self, name):
 96        super().__init__(provides='download_paths', name=name)
 97
 98    def execute(self, image_locations):
 99        for src, loc in image_locations.items():
100            with slow_down(1):
101                print(f"Downloading from {src} => {loc}")
102        return sorted(image_locations.values())
103
104
105class CreateNetworkTpl(task.Task):
106    """Generates the network settings file to be placed in the images."""
107
108    SYSCONFIG_CONTENTS = """DEVICE=eth%s
109BOOTPROTO=static
110IPADDR=%s
111ONBOOT=yes"""
112
113    def __init__(self, name):
114        super().__init__(provides='network_settings', name=name)
115
116    def execute(self, ips):
117        settings = []
118        for i, ip in enumerate(ips):
119            settings.append(self.SYSCONFIG_CONTENTS % (i, ip))
120        return settings
121
122
123class AllocateIP(task.Task):
124    """Allocates the ips for the given vm."""
125
126    def __init__(self, name):
127        super().__init__(provides='ips', name=name)
128
129    def execute(self, vm_spec):
130        ips = []
131        for _i in range(0, vm_spec.get('ips', 0)):
132            ips.append("192.168.0.%s" % (random.randint(1, 254)))
133        return ips
134
135
136class WriteNetworkSettings(task.Task):
137    """Writes all the network settings into the downloaded images."""
138
139    def execute(self, download_paths, network_settings):
140        for j, path in enumerate(download_paths):
141            with slow_down(1):
142                print(f"Mounting {path} to /tmp/{j}")
143            for i, setting in enumerate(network_settings):
144                filename = "/tmp/etc/sysconfig/network-scripts/ifcfg-eth%s" % (
145                    i
146                )
147                with slow_down(1):
148                    print("Writing to %s" % (filename))
149                    print(setting)
150
151
152class BootVM(task.Task):
153    """Fires off the vm boot operation."""
154
155    def execute(self, vm_spec):
156        print("Starting vm!")
157        with slow_down(1):
158            print("Created: %s" % (vm_spec))
159
160
161class AllocateVolumes(task.Task):
162    """Allocates the volumes for the vm."""
163
164    def execute(self, vm_spec):
165        volumes = []
166        for i in range(0, vm_spec['volumes']):
167            with slow_down(1):
168                volumes.append("/dev/vda%s" % (i + 1))
169                print("Allocated volume %s" % volumes[-1])
170        return volumes
171
172
173class FormatVolumes(task.Task):
174    """Formats the volumes for the vm."""
175
176    def execute(self, volumes):
177        for v in volumes:
178            print("Formatting volume %s" % v)
179            with slow_down(1):
180                pass
181            print("Formatted volume %s" % v)
182
183
184def create_flow():
185    # Setup the set of things to do (mini-nova).
186    flow = lf.Flow("root").add(
187        PrintText("Starting vm creation.", no_slow=True),
188        lf.Flow('vm-maker').add(
189            # First create a specification for the final vm to-be.
190            DefineVMSpec("define_spec"),
191            # This does all the image stuff.
192            gf.Flow("img-maker").add(
193                LocateImages("locate_images"),
194                DownloadImages("download_images"),
195            ),
196            # This does all the network stuff.
197            gf.Flow("net-maker").add(
198                AllocateIP("get_my_ips"),
199                CreateNetworkTpl("fetch_net_settings"),
200                WriteNetworkSettings("write_net_settings"),
201            ),
202            # This does all the volume stuff.
203            gf.Flow("volume-maker").add(
204                AllocateVolumes("allocate_my_volumes", provides='volumes'),
205                FormatVolumes("volume_formatter"),
206            ),
207            # Finally boot it all.
208            BootVM("boot-it"),
209        ),
210        # Ya it worked!
211        PrintText("Finished vm create.", no_slow=True),
212        PrintText("Instance is running!", no_slow=True),
213    )
214    return flow
215
216
217eu.print_wrapped("Initializing")
218
219# Setup the persistence & resumption layer.
220with eu.get_backend() as backend:
221    # Try to find a previously passed in tracking id...
222    try:
223        book_id, flow_id = sys.argv[2].split("+", 1)
224        if not uuidutils.is_uuid_like(book_id):
225            book_id = None
226        if not uuidutils.is_uuid_like(flow_id):
227            flow_id = None
228    except (IndexError, ValueError):
229        book_id = None
230        flow_id = None
231
232    # Set up how we want our engine to run, serial, parallel...
233    try:
234        executor = futurist.GreenThreadPoolExecutor(max_workers=5)
235    except RuntimeError:
236        # No eventlet installed, just let the default be used instead.
237        executor = None
238
239    # Create/fetch a logbook that will track the workflows work.
240    book = None
241    flow_detail = None
242    if all([book_id, flow_id]):
243        # Try to find in a prior logbook and flow detail...
244        with contextlib.closing(backend.get_connection()) as conn:
245            try:
246                book = conn.get_logbook(book_id)
247                flow_detail = book.find(flow_id)
248            except exc.NotFound:
249                pass
250    if book is None and flow_detail is None:
251        book = models.LogBook("vm-boot")
252        with contextlib.closing(backend.get_connection()) as conn:
253            conn.save_logbook(book)
254        engine = engines.load_from_factory(
255            create_flow,
256            backend=backend,
257            book=book,
258            engine='parallel',
259            executor=executor,
260        )
261        print(
262            "!! Your tracking id is: '{}+{}'".format(
263                book.uuid, engine.storage.flow_uuid
264            )
265        )
266        print("!! Please submit this on later runs for tracking purposes")
267    else:
268        # Attempt to load from a previously partially completed flow.
269        engine = engines.load_from_detail(
270            flow_detail, backend=backend, engine='parallel', executor=executor
271        )
272
273    # Make me my vm please!
274    eu.print_wrapped('Running')
275    engine.run()
276
277# How to use.
278#
279# 1. $ python me.py "sqlite:////tmp/nova.db"
280# 2. ctrl-c before this finishes
281# 3. Find the tracking id (search for 'Your tracking id is')
282# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
283# 5. Watch it pick up where it left off.
284# 6. Profit!

Creating a volume (resumable)

Note

Full source located at resume_volume_create

  1import hashlib
  2import logging
  3import os
  4import random
  5import sys
  6import time
  7
  8logging.basicConfig(level=logging.ERROR)
  9
 10self_dir = os.path.abspath(os.path.dirname(__file__))
 11top_dir = os.path.abspath(
 12    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 13)
 14sys.path.insert(0, top_dir)
 15sys.path.insert(0, self_dir)
 16
 17from oslo_utils import uuidutils
 18
 19from taskflow import engines
 20from taskflow.patterns import graph_flow as gf
 21from taskflow.patterns import linear_flow as lf
 22from taskflow.persistence import models
 23from taskflow import task
 24
 25import example_utils  # noqa
 26
 27# INTRO: These examples show how a hierarchy of flows can be used to create a
 28# pseudo-volume in a reliable & resumable manner using taskflow + a miniature
 29# version of what cinder does while creating a volume (very miniature).
 30
 31
 32@contextlib.contextmanager
 33def slow_down(how_long=0.5):
 34    try:
 35        yield how_long
 36    finally:
 37        print("** Ctrl-c me please!!! **")
 38        time.sleep(how_long)
 39
 40
 41def find_flow_detail(backend, book_id, flow_id):
 42    # NOTE(harlowja): this is used to attempt to find a given logbook with
 43    # a given id and a given flow details inside that logbook, we need this
 44    # reference so that we can resume the correct flow (as a logbook tracks
 45    # flows and a flow detail tracks a individual flow).
 46    #
 47    # Without a reference to the logbook and the flow details in that logbook
 48    # we will not know exactly what we should resume and that would mean we
 49    # can't resume what we don't know.
 50    with contextlib.closing(backend.get_connection()) as conn:
 51        lb = conn.get_logbook(book_id)
 52        return lb.find(flow_id)
 53
 54
 55class PrintText(task.Task):
 56    def __init__(self, print_what, no_slow=False):
 57        content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
 58        super().__init__(name="Print: %s" % (content_hash))
 59        self._text = print_what
 60        self._no_slow = no_slow
 61
 62    def execute(self):
 63        if self._no_slow:
 64            print("-" * (len(self._text)))
 65            print(self._text)
 66            print("-" * (len(self._text)))
 67        else:
 68            with slow_down():
 69                print("-" * (len(self._text)))
 70                print(self._text)
 71                print("-" * (len(self._text)))
 72
 73
 74class CreateSpecForVolumes(task.Task):
 75    def execute(self):
 76        volumes = []
 77        for i in range(0, random.randint(1, 10)):
 78            volumes.append(
 79                {
 80                    'type': 'disk',
 81                    'location': "/dev/vda%s" % (i + 1),
 82                }
 83            )
 84        return volumes
 85
 86
 87class PrepareVolumes(task.Task):
 88    def execute(self, volume_specs):
 89        for v in volume_specs:
 90            with slow_down():
 91                print("Dusting off your hard drive %s" % (v))
 92            with slow_down():
 93                print("Taking a well deserved break.")
 94            print("Your drive %s has been certified." % (v))
 95
 96
 97# Setup the set of things to do (mini-cinder).
 98flow = lf.Flow("root").add(
 99    PrintText("Starting volume create", no_slow=True),
100    gf.Flow('maker').add(
101        CreateSpecForVolumes("volume_specs", provides='volume_specs'),
102        PrintText("I need a nap, it took me a while to build those specs."),
103        PrepareVolumes(),
104    ),
105    PrintText("Finished volume create", no_slow=True),
106)
107
108# Setup the persistence & resumption layer.
109with example_utils.get_backend() as backend:
110    try:
111        book_id, flow_id = sys.argv[2].split("+", 1)
112    except (IndexError, ValueError):
113        book_id = None
114        flow_id = None
115
116    if not all([book_id, flow_id]):
117        # If no 'tracking id' (think a fedex or ups tracking id) is provided
118        # then we create one by creating a logbook (where flow details are
119        # stored) and creating a flow detail (where flow and task state is
120        # stored). The combination of these 2 objects unique ids (uuids) allows
121        # the users of taskflow to reassociate the workflows that were
122        # potentially running (and which may have partially completed) back
123        # with taskflow so that those workflows can be resumed (or reverted)
124        # after a process/thread/engine has failed in someway.
125        book = models.LogBook('resume-volume-create')
126        flow_detail = models.FlowDetail("root", uuid=uuidutils.generate_uuid())
127        book.add(flow_detail)
128        with contextlib.closing(backend.get_connection()) as conn:
129            conn.save_logbook(book)
130        print(
131            "!! Your tracking id is: '{}+{}'".format(
132                book.uuid, flow_detail.uuid
133            )
134        )
135        print("!! Please submit this on later runs for tracking purposes")
136    else:
137        flow_detail = find_flow_detail(backend, book_id, flow_id)
138
139    # Load and run.
140    engine = engines.load(
141        flow, flow_detail=flow_detail, backend=backend, engine='serial'
142    )
143    engine.run()
144
145# How to use.
146#
147# 1. $ python me.py "sqlite:////tmp/cinder.db"
148# 2. ctrl-c before this finishes
149# 3. Find the tracking id (search for 'Your tracking id is')
150# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
151# 5. Profit!

Running engines via iteration

Note

Full source located at run_by_iter

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6self_dir = os.path.abspath(os.path.dirname(__file__))
 7top_dir = os.path.abspath(
 8    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 9)
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13
14from taskflow import engines
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17
18
19# INTRO: This example shows how to run a set of engines at the same time, each
20# running in different engines using a single thread of control to iterate over
21# each engine (which causes that engine to advanced to its next state during
22# each iteration).
23
24
25class EchoTask(task.Task):
26    def execute(self, value):
27        print(value)
28        return chr(ord(value) + 1)
29
30
31def make_alphabet_flow(i):
32    f = lf.Flow("alphabet_%s" % (i))
33    start_value = 'A'
34    end_value = 'Z'
35    curr_value = start_value
36    while ord(curr_value) <= ord(end_value):
37        next_value = chr(ord(curr_value) + 1)
38        if curr_value != end_value:
39            f.add(
40                EchoTask(
41                    name="echoer_%s" % curr_value,
42                    rebind={'value': curr_value},
43                    provides=next_value,
44                )
45            )
46        else:
47            f.add(
48                EchoTask(
49                    name="echoer_%s" % curr_value, rebind={'value': curr_value}
50                )
51            )
52        curr_value = next_value
53    return f
54
55
56# Adjust this number to change how many engines/flows run at once.
57flow_count = 1
58flows = []
59for i in range(0, flow_count):
60    f = make_alphabet_flow(i + 1)
61    flows.append(make_alphabet_flow(i + 1))
62engine_iters = []
63for f in flows:
64    e = engines.load(f)
65    e.compile()
66    e.storage.inject({'A': 'A'})
67    e.prepare()
68    engine_iters.append(e.run_iter())
69while engine_iters:
70    for it in list(engine_iters):
71        try:
72            print(next(it))
73        except StopIteration:
74            engine_iters.remove(it)

Controlling retries using a retry controller

Note

Full source located at retry_flow

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(
 7    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 8)
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import retry
14from taskflow import task
15
16# INTRO: In this example we create a retry controller that receives a phone
17# directory and tries different phone numbers. The next task tries to call Jim
18# using the given number. If it is not a Jim's number, the task raises an
19# exception and retry controller takes the next number from the phone
20# directory and retries the call.
21#
22# This example shows a basic usage of retry controllers in a flow.
23# Retry controllers allows to revert and retry a failed subflow with new
24# parameters.
25
26
27class CallJim(task.Task):
28    def execute(self, jim_number):
29        print("Calling jim %s." % jim_number)
30        if jim_number != 555:
31            raise Exception("Wrong number!")
32        else:
33            print("Hello Jim!")
34
35    def revert(self, jim_number, **kwargs):
36        print("Wrong number, apologizing.")
37
38
39# Create your flow and associated tasks (the work to be done).
40flow = lf.Flow(
41    'retrying-linear',
42    retry=retry.ParameterizedForEach(
43        rebind=['phone_directory'], provides='jim_number'
44    ),
45).add(CallJim())
46
47# Now run that flow using the provided initial data (store below).
48taskflow.engines.run(flow, store={'phone_directory': [333, 444, 555, 666]})

Distributed execution (simple)

Note

Full source located at wbe_simple_linear

  1import logging
  2import os
  3import sys
  4import tempfile
  5
  6top_dir = os.path.abspath(
  7    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
  8)
  9sys.path.insert(0, top_dir)
 10
 11from taskflow import engines
 12from taskflow.engines.worker_based import worker
 13from taskflow.patterns import linear_flow as lf
 14from taskflow.tests import utils
 15from taskflow.utils import threading_utils
 16
 17import example_utils  # noqa
 18
 19# INTRO: This example walks through a miniature workflow which shows how to
 20# start up a number of workers (these workers will process task execution and
 21# reversion requests using any provided input data) and then use an engine
 22# that creates a set of *capable* tasks and flows (the engine can not create
 23# tasks that the workers are not able to run, this will end in failure) that
 24# those workers will run and then executes that workflow seamlessly using the
 25# workers to perform the actual execution.
 26#
 27# NOTE(harlowja): this example simulates the expected larger number of workers
 28# by using a set of threads (which in this example simulate the remote workers
 29# that would typically be running on other external machines).
 30
 31# A filesystem can also be used as the queue transport (useful as simple
 32# transport type that does not involve setting up a larger mq system). If this
 33# is false then the memory transport is used instead, both work in standalone
 34# setups.
 35USE_FILESYSTEM = False
 36BASE_SHARED_CONF = {
 37    'exchange': 'taskflow',
 38}
 39
 40# Until https://github.com/celery/kombu/issues/398 is resolved it is not
 41# recommended to run many worker threads in this example due to the types
 42# of errors mentioned in that issue.
 43MEMORY_WORKERS = 2
 44FILE_WORKERS = 1
 45WORKER_CONF = {
 46    # These are the tasks the worker can execute, they *must* be importable,
 47    # typically this list is used to restrict what workers may execute to
 48    # a smaller set of *allowed* tasks that are known to be safe (one would
 49    # not want to allow all python code to be executed).
 50    'tasks': [
 51        'taskflow.tests.utils:TaskOneArgOneReturn',
 52        'taskflow.tests.utils:TaskMultiArgOneReturn',
 53    ],
 54}
 55
 56
 57def run(engine_options):
 58    flow = lf.Flow('simple-linear').add(
 59        utils.TaskOneArgOneReturn(provides='result1'),
 60        utils.TaskMultiArgOneReturn(provides='result2'),
 61    )
 62    eng = engines.load(
 63        flow,
 64        store=dict(x=111, y=222, z=333),
 65        engine='worker-based',
 66        **engine_options,
 67    )
 68    eng.run()
 69    return eng.storage.fetch_all()
 70
 71
 72if __name__ == "__main__":
 73    logging.basicConfig(level=logging.ERROR)
 74
 75    # Setup our transport configuration and merge it into the worker and
 76    # engine configuration so that both of those use it correctly.
 77    shared_conf = dict(BASE_SHARED_CONF)
 78
 79    tmp_path = None
 80    if USE_FILESYSTEM:
 81        worker_count = FILE_WORKERS
 82        tmp_path = tempfile.mkdtemp(prefix='wbe-example-')
 83        shared_conf.update(
 84            {
 85                'transport': 'filesystem',
 86                'transport_options': {
 87                    'data_folder_in': tmp_path,
 88                    'data_folder_out': tmp_path,
 89                    'polling_interval': 0.1,
 90                },
 91            }
 92        )
 93    else:
 94        worker_count = MEMORY_WORKERS
 95        shared_conf.update(
 96            {
 97                'transport': 'memory',
 98                'transport_options': {
 99                    'polling_interval': 0.1,
100                },
101            }
102        )
103    worker_conf = dict(WORKER_CONF)
104    worker_conf.update(shared_conf)
105    engine_options = dict(shared_conf)
106    workers = []
107    worker_topics = []
108
109    try:
110        # Create a set of workers to simulate actual remote workers.
111        print('Running %s workers.' % (worker_count))
112        for i in range(0, worker_count):
113            worker_conf['topic'] = 'worker-%s' % (i + 1)
114            worker_topics.append(worker_conf['topic'])
115            w = worker.Worker(**worker_conf)
116            runner = threading_utils.daemon_thread(w.run)
117            runner.start()
118            w.wait()
119            workers.append((runner, w.stop))
120
121        # Now use those workers to do something.
122        print('Executing some work.')
123        engine_options['topics'] = worker_topics
124        result = run(engine_options)
125        print('Execution finished.')
126        # This is done so that the test examples can work correctly
127        # even when the keys change order (which will happen in various
128        # python versions).
129        print("Result = %s" % json.dumps(result, sort_keys=True))
130    finally:
131        # And cleanup.
132        print('Stopping workers.')
133        while workers:
134            r, stopper = workers.pop()
135            stopper()
136            r.join()
137        if tmp_path:
138            example_utils.rm_path(tmp_path)

Distributed notification (simple)

Note

Full source located at wbe_event_sender

  1import os
  2import string
  3import sys
  4import time
  5
  6top_dir = os.path.abspath(
  7    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
  8)
  9sys.path.insert(0, top_dir)
 10
 11from taskflow import engines
 12from taskflow.engines.worker_based import worker
 13from taskflow.patterns import linear_flow as lf
 14from taskflow import task
 15from taskflow.types import notifier
 16from taskflow.utils import threading_utils
 17
 18ANY = notifier.Notifier.ANY
 19
 20# INTRO: These examples show how to use a remote worker's event notification
 21# attribute to proxy back task event notifications to the controlling process.
 22#
 23# In this case a simple set of events is triggered by a worker running a
 24# task (simulated to be remote by using a kombu memory transport and threads).
 25# Those events that the 'remote worker' produces will then be proxied back to
 26# the task that the engine is running 'remotely', and then they will be emitted
 27# back to the original callbacks that exist in the originating engine
 28# process/thread. This creates a one-way *notification* channel that can
 29# transparently be used in-process, outside-of-process using remote workers and
 30# so-on that allows tasks to signal to its controlling process some sort of
 31# action that has occurred that the task may need to tell others about (for
 32# example to trigger some type of response when the task reaches 50% done...).
 33
 34
 35def event_receiver(event_type, details):
 36    """This is the callback that (in this example) doesn't do much..."""
 37    print("Recieved event '%s'" % event_type)
 38    print("Details = %s" % details)
 39
 40
 41class EventReporter(task.Task):
 42    """This is the task that will be running 'remotely' (not really remote)."""
 43
 44    EVENTS = tuple(string.ascii_uppercase)
 45    EVENT_DELAY = 0.1
 46
 47    def execute(self):
 48        for i, e in enumerate(self.EVENTS):
 49            details = {
 50                'leftover': self.EVENTS[i:],
 51            }
 52            self.notifier.notify(e, details)
 53            time.sleep(self.EVENT_DELAY)
 54
 55
 56BASE_SHARED_CONF = {
 57    'exchange': 'taskflow',
 58    'transport': 'memory',
 59    'transport_options': {
 60        'polling_interval': 0.1,
 61    },
 62}
 63
 64# Until https://github.com/celery/kombu/issues/398 is resolved it is not
 65# recommended to run many worker threads in this example due to the types
 66# of errors mentioned in that issue.
 67MEMORY_WORKERS = 1
 68WORKER_CONF = {
 69    'tasks': [
 70        # Used to locate which tasks we can run (we don't want to allow
 71        # arbitrary code/tasks to be ran by any worker since that would
 72        # open up a variety of vulnerabilities).
 73        '%s:EventReporter' % (__name__),
 74    ],
 75}
 76
 77
 78def run(engine_options):
 79    reporter = EventReporter()
 80    reporter.notifier.register(ANY, event_receiver)
 81    flow = lf.Flow('event-reporter').add(reporter)
 82    eng = engines.load(flow, engine='worker-based', **engine_options)
 83    eng.run()
 84
 85
 86if __name__ == "__main__":
 87    logging.basicConfig(level=logging.ERROR)
 88
 89    # Setup our transport configuration and merge it into the worker and
 90    # engine configuration so that both of those objects use it correctly.
 91    worker_conf = dict(WORKER_CONF)
 92    worker_conf.update(BASE_SHARED_CONF)
 93    engine_options = dict(BASE_SHARED_CONF)
 94    workers = []
 95
 96    # These topics will be used to request worker information on; those
 97    # workers will respond with their capabilities which the executing engine
 98    # will use to match pending tasks to a matched worker, this will cause
 99    # the task to be sent for execution, and the engine will wait until it
100    # is finished (a response is received) and then the engine will either
101    # continue with other tasks, do some retry/failure resolution logic or
102    # stop (and potentially re-raise the remote workers failure)...
103    worker_topics = []
104
105    try:
106        # Create a set of worker threads to simulate actual remote workers...
107        print('Running %s workers.' % (MEMORY_WORKERS))
108        for i in range(0, MEMORY_WORKERS):
109            # Give each one its own unique topic name so that they can
110            # correctly communicate with the engine (they will all share the
111            # same exchange).
112            worker_conf['topic'] = 'worker-%s' % (i + 1)
113            worker_topics.append(worker_conf['topic'])
114            w = worker.Worker(**worker_conf)
115            runner = threading_utils.daemon_thread(w.run)
116            runner.start()
117            w.wait()
118            workers.append((runner, w.stop))
119
120        # Now use those workers to do something.
121        print('Executing some work.')
122        engine_options['topics'] = worker_topics
123        result = run(engine_options)
124        print('Execution finished.')
125    finally:
126        # And cleanup.
127        print('Stopping workers.')
128        while workers:
129            r, stopper = workers.pop()
130            stopper()
131            r.join()

Distributed mandelbrot (complex)

Note

Full source located at wbe_mandelbrot

Output

Generated mandelbrot fractal

Code

  1import math
  2import os
  3import sys
  4
  5top_dir = os.path.abspath(
  6    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
  7)
  8sys.path.insert(0, top_dir)
  9
 10from taskflow import engines
 11from taskflow.engines.worker_based import worker
 12from taskflow.patterns import unordered_flow as uf
 13from taskflow import task
 14from taskflow.utils import threading_utils
 15
 16# INTRO: This example walks through a workflow that will in parallel compute
 17# a mandelbrot result set (using X 'remote' workers) and then combine their
 18# results together to form a final mandelbrot fractal image. It shows a usage
 19# of taskflow to perform a well-known embarrassingly parallel problem that has
 20# the added benefit of also being an elegant visualization.
 21#
 22# NOTE(harlowja): this example simulates the expected larger number of workers
 23# by using a set of threads (which in this example simulate the remote workers
 24# that would typically be running on other external machines).
 25#
 26# NOTE(harlowja): to have it produce an image run (after installing pillow):
 27#
 28# $ python taskflow/examples/wbe_mandelbrot.py output.png
 29
 30BASE_SHARED_CONF = {
 31    'exchange': 'taskflow',
 32}
 33WORKERS = 2
 34WORKER_CONF = {
 35    # These are the tasks the worker can execute, they *must* be importable,
 36    # typically this list is used to restrict what workers may execute to
 37    # a smaller set of *allowed* tasks that are known to be safe (one would
 38    # not want to allow all python code to be executed).
 39    'tasks': [
 40        '%s:MandelCalculator' % (__name__),
 41    ],
 42}
 43ENGINE_CONF = {
 44    'engine': 'worker-based',
 45}
 46
 47# Mandelbrot & image settings...
 48IMAGE_SIZE = (512, 512)
 49CHUNK_COUNT = 8
 50MAX_ITERATIONS = 25
 51
 52
 53class MandelCalculator(task.Task):
 54    def execute(self, image_config, mandelbrot_config, chunk):
 55        """Returns the number of iterations before the computation "escapes".
 56
 57        Given the real and imaginary parts of a complex number, determine if it
 58        is a candidate for membership in the mandelbrot set given a fixed
 59        number of iterations.
 60        """
 61
 62        # Parts borrowed from (credit to mark harris and benoît mandelbrot).
 63        #
 64        # http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43
 65        def mandelbrot(x, y, max_iters):
 66            c = complex(x, y)
 67            z = 0.0j
 68            for i in range(max_iters):
 69                z = z * z + c
 70                if (z.real * z.real + z.imag * z.imag) >= 4:
 71                    return i
 72            return max_iters
 73
 74        min_x, max_x, min_y, max_y, max_iters = mandelbrot_config
 75        height, width = image_config['size']
 76        pixel_size_x = (max_x - min_x) / width
 77        pixel_size_y = (max_y - min_y) / height
 78        block = []
 79        for y in range(chunk[0], chunk[1]):
 80            row = []
 81            imag = min_y + y * pixel_size_y
 82            for x in range(0, width):
 83                real = min_x + x * pixel_size_x
 84                row.append(mandelbrot(real, imag, max_iters))
 85            block.append(row)
 86        return block
 87
 88
 89def calculate(engine_conf):
 90    # Subdivide the work into X pieces, then request each worker to calculate
 91    # one of those chunks and then later we will write these chunks out to
 92    # an image bitmap file.
 93
 94    # And unordered flow is used here since the mandelbrot calculation is an
 95    # example of an embarrassingly parallel computation that we can scatter
 96    # across as many workers as possible.
 97    flow = uf.Flow("mandelbrot")
 98
 99    # These symbols will be automatically given to tasks as input to their
100    # execute method, in this case these are constants used in the mandelbrot
101    # calculation.
102    store = {
103        'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS],
104        'image_config': {
105            'size': IMAGE_SIZE,
106        },
107    }
108
109    # We need the task names to be in the right order so that we can extract
110    # the final results in the right order (we don't care about the order when
111    # executing).
112    task_names = []
113
114    # Compose our workflow.
115    height, _width = IMAGE_SIZE
116    chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
117    for i in range(0, CHUNK_COUNT):
118        chunk_name = 'chunk_%s' % i
119        task_name = "calculation_%s" % i
120        # Break the calculation up into chunk size pieces.
121        rows = [i * chunk_size, i * chunk_size + chunk_size]
122        flow.add(
123            MandelCalculator(
124                task_name,
125                # This ensures the storage symbol with name
126                # 'chunk_name' is sent into the tasks local
127                # symbol 'chunk'. This is how we give each
128                # calculator its own correct sequence of rows
129                # to work on.
130                rebind={'chunk': chunk_name},
131            )
132        )
133        store[chunk_name] = rows
134        task_names.append(task_name)
135
136    # Now execute it.
137    eng = engines.load(flow, store=store, engine_conf=engine_conf)
138    eng.run()
139
140    # Gather all the results and order them for further processing.
141    gather = []
142    for name in task_names:
143        gather.extend(eng.storage.get(name))
144    points = []
145    for y, row in enumerate(gather):
146        for x, color in enumerate(row):
147            points.append(((x, y), color))
148    return points
149
150
151def write_image(results, output_filename=None):
152    print(
153        "Gathered %s results that represents a mandelbrot"
154        " image (using %s chunks that are computed jointly"
155        " by %s workers)." % (len(results), CHUNK_COUNT, WORKERS)
156    )
157    if not output_filename:
158        return
159
160    # Pillow (the PIL fork) saves us from writing our own image writer...
161    try:
162        from PIL import Image
163    except ImportError as e:
164        # To currently get this (may change in the future),
165        # $ pip install Pillow
166        raise RuntimeError("Pillow is required to write image files: %s" % e)
167
168    # Limit to 255, find the max and normalize to that...
169    color_max = 0
170    for _point, color in results:
171        color_max = max(color, color_max)
172
173    # Use gray scale since we don't really have other colors.
174    img = Image.new('L', IMAGE_SIZE, "black")
175    pixels = img.load()
176    for (x, y), color in results:
177        if color_max == 0:
178            color = 0
179        else:
180            color = int((float(color) / color_max) * 255.0)
181        pixels[x, y] = color
182    img.save(output_filename)
183
184
185def create_fractal():
186    logging.basicConfig(level=logging.ERROR)
187
188    # Setup our transport configuration and merge it into the worker and
189    # engine configuration so that both of those use it correctly.
190    shared_conf = dict(BASE_SHARED_CONF)
191    shared_conf.update(
192        {
193            'transport': 'memory',
194            'transport_options': {
195                'polling_interval': 0.1,
196            },
197        }
198    )
199
200    if len(sys.argv) >= 2:
201        output_filename = sys.argv[1]
202    else:
203        output_filename = None
204
205    worker_conf = dict(WORKER_CONF)
206    worker_conf.update(shared_conf)
207    engine_conf = dict(ENGINE_CONF)
208    engine_conf.update(shared_conf)
209    workers = []
210    worker_topics = []
211
212    print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE)
213    try:
214        # Create a set of workers to simulate actual remote workers.
215        print('Running %s workers.' % (WORKERS))
216        for i in range(0, WORKERS):
217            worker_conf['topic'] = 'calculator_%s' % (i + 1)
218            worker_topics.append(worker_conf['topic'])
219            w = worker.Worker(**worker_conf)
220            runner = threading_utils.daemon_thread(w.run)
221            runner.start()
222            w.wait()
223            workers.append((runner, w.stop))
224
225        # Now use those workers to do something.
226        engine_conf['topics'] = worker_topics
227        results = calculate(engine_conf)
228        print('Execution finished.')
229    finally:
230        # And cleanup.
231        print('Stopping workers.')
232        while workers:
233            r, stopper = workers.pop()
234            stopper()
235            r.join()
236    print("Writing image...")
237    write_image(results, output_filename=output_filename)
238
239
240if __name__ == "__main__":
241    create_fractal()

Jobboard producer/consumer (simple)

Note

Full source located at jobboard_produce_consume_colors

  1import contextlib
  2import logging
  3import os
  4import random
  5import sys
  6import threading
  7import time
  8
  9logging.basicConfig(level=logging.ERROR)
 10
 11top_dir = os.path.abspath(
 12    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 13)
 14sys.path.insert(0, top_dir)
 15
 16from taskflow import exceptions as excp
 17from taskflow.jobs import backends
 18from taskflow.utils import kazoo_utils
 19from taskflow.utils import threading_utils
 20
 21# In this example we show how a jobboard can be used to post work for other
 22# entities to work on. This example creates a set of jobs using one producer
 23# thread (typically this would be split across many machines) and then having
 24# other worker threads with their own jobboards select work using a given
 25# filters [red/blue] and then perform that work (and consuming or abandoning
 26# the job after it has been completed or failed).
 27
 28# Things to note:
 29# - No persistence layer is used (or logbook), just the job details are used
 30#   to determine if a job should be selected by a worker or not.
 31# - This example runs in a single process (this is expected to be atypical
 32#   but this example shows that it can be done if needed, for testing...)
 33# - The iterjobs(), claim(), consume()/abandon() worker workflow.
 34# - The post() producer workflow.
 35
 36SHARED_CONF = {
 37    'path': "/taskflow/jobs",
 38    'board': 'zookeeper',
 39}
 40
 41# How many workers and producers of work will be created (as threads).
 42PRODUCERS = 3
 43WORKERS = 5
 44
 45# How many units of work each producer will create.
 46PRODUCER_UNITS = 10
 47
 48# How many units of work are expected to be produced (used so workers can
 49# know when to stop running and shutdown, typically this would not be a
 50# a value but we have to limit this example's execution time to be less than
 51# infinity).
 52EXPECTED_UNITS = PRODUCER_UNITS * PRODUCERS
 53
 54# Delay between producing/consuming more work.
 55WORKER_DELAY, PRODUCER_DELAY = (0.5, 0.5)
 56
 57# To ensure threads don't trample other threads output.
 58STDOUT_LOCK = threading.Lock()
 59
 60
 61def dispatch_work(job):
 62    # This is where the jobs contained work *would* be done
 63    time.sleep(1.0)
 64
 65
 66def safe_print(name, message, prefix=""):
 67    with STDOUT_LOCK:
 68        if prefix:
 69            print(f"{prefix} {name}: {message}")
 70        else:
 71            print(f"{name}: {message}")
 72
 73
 74def worker(ident, client, consumed):
 75    # Create a personal board (using the same client so that it works in
 76    # the same process) and start looking for jobs on the board that we want
 77    # to perform.
 78    name = "W-%s" % (ident)
 79    safe_print(name, "started")
 80    claimed_jobs = 0
 81    consumed_jobs = 0
 82    abandoned_jobs = 0
 83    with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
 84        while len(consumed) != EXPECTED_UNITS:
 85            favorite_color = random.choice(['blue', 'red'])
 86            for job in board.iterjobs(ensure_fresh=True, only_unclaimed=True):
 87                # See if we should even bother with it...
 88                if job.details.get('color') != favorite_color:
 89                    continue
 90                safe_print(name, "'%s' [attempting claim]" % (job))
 91                try:
 92                    board.claim(job, name)
 93                    claimed_jobs += 1
 94                    safe_print(name, "'%s' [claimed]" % (job))
 95                except (excp.NotFound, excp.UnclaimableJob):
 96                    safe_print(name, "'%s' [claim unsuccessful]" % (job))
 97                else:
 98                    try:
 99                        dispatch_work(job)
100                        board.consume(job, name)
101                        safe_print(name, "'%s' [consumed]" % (job))
102                        consumed_jobs += 1
103                        consumed.append(job)
104                    except Exception:
105                        board.abandon(job, name)
106                        abandoned_jobs += 1
107                        safe_print(name, "'%s' [abandoned]" % (job))
108            time.sleep(WORKER_DELAY)
109    safe_print(
110        name,
111        "finished (claimed %s jobs, consumed %s jobs,"
112        " abandoned %s jobs)" % (claimed_jobs, consumed_jobs, abandoned_jobs),
113        prefix=">>>",
114    )
115
116
117def producer(ident, client):
118    # Create a personal board (using the same client so that it works in
119    # the same process) and start posting jobs on the board that we want
120    # some entity to perform.
121    name = "P-%s" % (ident)
122    safe_print(name, "started")
123    with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
124        for i in range(0, PRODUCER_UNITS):
125            job_name = f"{name}-{i}"
126            details = {
127                'color': random.choice(['red', 'blue']),
128            }
129            job = board.post(job_name, book=None, details=details)
130            safe_print(name, "'%s' [posted]" % (job))
131            time.sleep(PRODUCER_DELAY)
132    safe_print(name, "finished", prefix=">>>")
133
134
135def main():
136    # TODO(harlowja): Hack to make eventlet work right, remove when the
137    # following is fixed: https://github.com/eventlet/eventlet/issues/230
138    from taskflow.utils import eventlet_utils as _eu  # noqa
139
140    try:
141        import eventlet as _eventlet  # noqa
142    except ImportError:
143        pass
144
145    client = kazoo_utils.make_client({})
146    with contextlib.closing(client) as c:
147        created = []
148        for i in range(0, PRODUCERS):
149            p = threading_utils.daemon_thread(producer, i + 1, c)
150            created.append(p)
151            p.start()
152        consumed = collections.deque()
153        for i in range(0, WORKERS):
154            w = threading_utils.daemon_thread(worker, i + 1, c, consumed)
155            created.append(w)
156            w.start()
157        while created:
158            t = created.pop()
159            t.join()
160        # At the end there should be nothing leftover, let's verify that.
161        board = backends.fetch('verifier', SHARED_CONF.copy(), client=c)
162        board.connect()
163        with contextlib.closing(board):
164            if board.job_count != 0 or len(consumed) != EXPECTED_UNITS:
165                return 1
166            return 0
167
168
169if __name__ == "__main__":
170    sys.exit(main())

Conductor simulating a CI pipeline

Note

Full source located at tox_conductor

  1import itertools
  2import logging
  3import os
  4import shutil
  5import socket
  6import sys
  7import tempfile
  8import threading
  9import time
 10
 11logging.basicConfig(level=logging.ERROR)
 12
 13top_dir = os.path.abspath(
 14    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 15)
 16sys.path.insert(0, top_dir)
 17
 18from oslo_utils import timeutils
 19from oslo_utils import uuidutils
 20
 21from taskflow.conductors import backends as conductors
 22from taskflow import engines
 23from taskflow.jobs import backends as boards
 24from taskflow.patterns import linear_flow
 25from taskflow.persistence import backends as persistence
 26from taskflow.persistence import models
 27from taskflow import task
 28from taskflow.utils import kazoo_utils
 29from taskflow.utils import threading_utils
 30
 31# INTRO: This examples shows how a worker/producer can post desired work (jobs)
 32# to a jobboard and a conductor can consume that work (jobs) from that jobboard
 33# and execute those jobs in a reliable & async manner (for example, if the
 34# conductor were to crash then the job will be released back onto the jobboard
 35# and another conductor can attempt to finish it, from wherever that job last
 36# left off).
 37#
 38# In this example a in-memory jobboard (and in-memory storage) is created and
 39# used that simulates how this would be done at a larger scale (it is an
 40# example after all).
 41
 42# Restrict how long this example runs for...
 43RUN_TIME = 5
 44REVIEW_CREATION_DELAY = 0.5
 45SCAN_DELAY = 0.1
 46NAME = f"{socket.getfqdn()}_{os.getpid()}"
 47
 48# This won't really use zookeeper but will use a local version of it using
 49# the zake library that mimics an actual zookeeper cluster using threads and
 50# an in-memory data structure.
 51JOBBOARD_CONF = {
 52    'board': 'zookeeper://localhost?path=/taskflow/tox/jobs',
 53}
 54
 55
 56class RunReview(task.Task):
 57    # A dummy task that clones the review and runs tox...
 58
 59    def _clone_review(self, review, temp_dir):
 60        print("Cloning review '{}' into {}".format(review['id'], temp_dir))
 61
 62    def _run_tox(self, temp_dir):
 63        print("Running tox in %s" % temp_dir)
 64
 65    def execute(self, review, temp_dir):
 66        self._clone_review(review, temp_dir)
 67        self._run_tox(temp_dir)
 68
 69
 70class MakeTempDir(task.Task):
 71    # A task that creates and destroys a temporary dir (on failure).
 72    #
 73    # It provides the location of the temporary dir for other tasks to use
 74    # as they see fit.
 75
 76    default_provides = 'temp_dir'
 77
 78    def execute(self):
 79        return tempfile.mkdtemp()
 80
 81    def revert(self, *args, **kwargs):
 82        temp_dir = kwargs.get(task.REVERT_RESULT)
 83        if temp_dir:
 84            shutil.rmtree(temp_dir)
 85
 86
 87class CleanResources(task.Task):
 88    # A task that cleans up any workflow resources.
 89
 90    def execute(self, temp_dir):
 91        print("Removing %s" % temp_dir)
 92        shutil.rmtree(temp_dir)
 93
 94
 95def review_iter():
 96    """Makes reviews (never-ending iterator/generator)."""
 97    review_id_gen = itertools.count(0)
 98    while True:
 99        review_id = next(review_id_gen)
100        review = {
101            'id': review_id,
102        }
103        yield review
104
105
106# The reason this is at the module namespace level is important, since it must
107# be accessible from a conductor dispatching an engine, if it was a lambda
108# function for example, it would not be reimportable and the conductor would
109# be unable to reference it when creating the workflow to run.
110def create_review_workflow():
111    """Factory method used to create a review workflow to run."""
112    f = linear_flow.Flow("tester")
113    f.add(
114        MakeTempDir(name="maker"),
115        RunReview(name="runner"),
116        CleanResources(name="cleaner"),
117    )
118    return f
119
120
121def generate_reviewer(client, saver, name=NAME):
122    """Creates a review producer thread with the given name prefix."""
123    real_name = "%s_reviewer" % name
124    no_more = threading.Event()
125    jb = boards.fetch(
126        real_name, JOBBOARD_CONF, client=client, persistence=saver
127    )
128
129    def make_save_book(saver, review_id):
130        # Record what we want to happen (sometime in the future).
131        book = models.LogBook("book_%s" % review_id)
132        detail = models.FlowDetail(
133            "flow_%s" % review_id, uuidutils.generate_uuid()
134        )
135        book.add(detail)
136        # Associate the factory method we want to be called (in the future)
137        # with the book, so that the conductor will be able to call into
138        # that factory to retrieve the workflow objects that represent the
139        # work.
140        #
141        # These args and kwargs *can* be used to save any specific parameters
142        # into the factory when it is being called to create the workflow
143        # objects (typically used to tell a factory how to create a unique
144        # workflow that represents this review).
145        factory_args = ()
146        factory_kwargs = {}
147        engines.save_factory_details(
148            detail, create_review_workflow, factory_args, factory_kwargs
149        )
150        with contextlib.closing(saver.get_connection()) as conn:
151            conn.save_logbook(book)
152            return book
153
154    def run():
155        """Periodically publishes 'fake' reviews to analyze."""
156        jb.connect()
157        review_generator = review_iter()
158        with contextlib.closing(jb):
159            while not no_more.is_set():
160                review = next(review_generator)
161                details = {
162                    'store': {
163                        'review': review,
164                    },
165                }
166                job_name = "{}_{}".format(real_name, review['id'])
167                print("Posting review '%s'" % review['id'])
168                jb.post(
169                    job_name,
170                    book=make_save_book(saver, review['id']),
171                    details=details,
172                )
173                time.sleep(REVIEW_CREATION_DELAY)
174
175    # Return the unstarted thread, and a callback that can be used
176    # shutdown that thread (to avoid running forever).
177    return (threading_utils.daemon_thread(target=run), no_more.set)
178
179
180def generate_conductor(client, saver, name=NAME):
181    """Creates a conductor thread with the given name prefix."""
182    real_name = "%s_conductor" % name
183    jb = boards.fetch(name, JOBBOARD_CONF, client=client, persistence=saver)
184    conductor = conductors.fetch(
185        "blocking", real_name, jb, engine='parallel', wait_timeout=SCAN_DELAY
186    )
187
188    def run():
189        jb.connect()
190        with contextlib.closing(jb):
191            conductor.run()
192
193    # Return the unstarted thread, and a callback that can be used
194    # shutdown that thread (to avoid running forever).
195    return (threading_utils.daemon_thread(target=run), conductor.stop)
196
197
198def main():
199    # Need to share the same backend, so that data can be shared...
200    persistence_conf = {
201        'connection': 'memory',
202    }
203    saver = persistence.fetch(persistence_conf)
204    with contextlib.closing(saver.get_connection()) as conn:
205        # This ensures that the needed backend setup/data directories/schema
206        # upgrades and so on... exist before they are attempted to be used...
207        conn.upgrade()
208    fc1 = kazoo_utils.make_client({})
209    fc2 = kazoo_utils.make_client({})
210    entities = [
211        generate_reviewer(fc1, saver),
212        generate_conductor(fc2, saver),
213    ]
214    for t, stopper in entities:
215        t.start()
216    try:
217        watch = timeutils.StopWatch(duration=RUN_TIME)
218        watch.start()
219        while not watch.expired():
220            time.sleep(0.1)
221    finally:
222        for t, stopper in reversed(entities):
223            stopper()
224            t.join()
225
226
227if __name__ == '__main__':
228    main()

Conductor running 99 bottles of beer song requests

Note

Full source located at 99_bottles

  1import functools
  2import logging
  3import os
  4import sys
  5import time
  6import traceback
  7
  8from kazoo import client
  9
 10top_dir = os.path.abspath(
 11    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
 12)
 13sys.path.insert(0, top_dir)
 14
 15from taskflow.conductors import backends as conductor_backends
 16from taskflow import engines
 17from taskflow.jobs import backends as job_backends
 18from taskflow import logging as taskflow_logging
 19from taskflow.patterns import linear_flow as lf
 20from taskflow.persistence import backends as persistence_backends
 21from taskflow.persistence import models
 22from taskflow import task
 23
 24from oslo_utils import timeutils
 25from oslo_utils import uuidutils
 26
 27# Instructions!
 28#
 29# 1. Install zookeeper (or change host listed below)
 30# 2. Download this example, place in file '99_bottles.py'
 31# 3. Run `python 99_bottles.py p` to place a song request onto the jobboard
 32# 4. Run `python 99_bottles.py c` a few times (in different shells)
 33# 5. On demand kill previously listed processes created in (4) and watch
 34#    the work resume on another process (and repeat)
 35# 6. Keep enough workers alive to eventually finish the song (if desired).
 36
 37ME = os.getpid()
 38ZK_HOST = "localhost:2181"
 39JB_CONF = {
 40    'hosts': ZK_HOST,
 41    'board': 'zookeeper',
 42    'path': '/taskflow/99-bottles-demo',
 43}
 44PERSISTENCE_URI = r"sqlite:////tmp/bottles.db"
 45TAKE_DOWN_DELAY = 1.0
 46PASS_AROUND_DELAY = 3.0
 47HOW_MANY_BOTTLES = 99
 48
 49
 50class TakeABottleDown(task.Task):
 51    def execute(self, bottles_left):
 52        sys.stdout.write('Take one down, ')
 53        sys.stdout.flush()
 54        time.sleep(TAKE_DOWN_DELAY)
 55        return bottles_left - 1
 56
 57
 58class PassItAround(task.Task):
 59    def execute(self):
 60        sys.stdout.write('pass it around, ')
 61        sys.stdout.flush()
 62        time.sleep(PASS_AROUND_DELAY)
 63
 64
 65class Conclusion(task.Task):
 66    def execute(self, bottles_left):
 67        sys.stdout.write('%s bottles of beer on the wall...\n' % bottles_left)
 68        sys.stdout.flush()
 69
 70
 71def make_bottles(count):
 72    # This is the function that will be called to generate the workflow
 73    # and will also be called to regenerate it on resumption so that work
 74    # can continue from where it last left off...
 75
 76    s = lf.Flow("bottle-song")
 77
 78    take_bottle = TakeABottleDown(
 79        "take-bottle-%s" % count,
 80        inject={'bottles_left': count},
 81        provides='bottles_left',
 82    )
 83    pass_it = PassItAround("pass-%s-around" % count)
 84    next_bottles = Conclusion("next-bottles-%s" % (count - 1))
 85    s.add(take_bottle, pass_it, next_bottles)
 86
 87    for bottle in reversed(list(range(1, count))):
 88        take_bottle = TakeABottleDown(
 89            "take-bottle-%s" % bottle, provides='bottles_left'
 90        )
 91        pass_it = PassItAround("pass-%s-around" % bottle)
 92        next_bottles = Conclusion("next-bottles-%s" % (bottle - 1))
 93        s.add(take_bottle, pass_it, next_bottles)
 94
 95    return s
 96
 97
 98def run_conductor(only_run_once=False):
 99    # This continuously runs consumers until its stopped via ctrl-c or other
100    # kill signal...
101    event_watches = {}
102
103    # This will be triggered by the conductor doing various activities
104    # with engines, and is quite nice to be able to see the various timing
105    # segments (which is useful for debugging, or watching, or figuring out
106    # where to optimize).
107    def on_conductor_event(cond, event, details):
108        print("Event '%s' has been received..." % event)
109        print("Details = %s" % details)
110        if event.endswith("_start"):
111            w = timeutils.StopWatch()
112            w.start()
113            base_event = event[0 : -len("_start")]
114            event_watches[base_event] = w
115        if event.endswith("_end"):
116            base_event = event[0 : -len("_end")]
117            try:
118                w = event_watches.pop(base_event)
119                w.stop()
120                print(
121                    "It took %0.3f seconds for event '%s' to finish"
122                    % (w.elapsed(), base_event)
123                )
124            except KeyError:
125                pass
126        if event == 'running_end' and only_run_once:
127            cond.stop()
128
129    print("Starting conductor with pid: %s" % ME)
130    my_name = "conductor-%s" % ME
131    persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
132    with contextlib.closing(persist_backend):
133        with contextlib.closing(persist_backend.get_connection()) as conn:
134            conn.upgrade()
135        job_backend = job_backends.fetch(
136            my_name, JB_CONF, persistence=persist_backend
137        )
138        job_backend.connect()
139        with contextlib.closing(job_backend):
140            cond = conductor_backends.fetch(
141                'blocking', my_name, job_backend, persistence=persist_backend
142            )
143            on_conductor_event = functools.partial(on_conductor_event, cond)
144            cond.notifier.register(cond.notifier.ANY, on_conductor_event)
145            # Run forever, and kill -9 or ctrl-c me...
146            try:
147                cond.run()
148            finally:
149                cond.stop()
150                cond.wait()
151
152
153def run_poster():
154    # This just posts a single job and then ends...
155    print("Starting poster with pid: %s" % ME)
156    my_name = "poster-%s" % ME
157    persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
158    with contextlib.closing(persist_backend):
159        with contextlib.closing(persist_backend.get_connection()) as conn:
160            conn.upgrade()
161        job_backend = job_backends.fetch(
162            my_name, JB_CONF, persistence=persist_backend
163        )
164        job_backend.connect()
165        with contextlib.closing(job_backend):
166            # Create information in the persistence backend about the
167            # unit of work we want to complete and the factory that
168            # can be called to create the tasks that the work unit needs
169            # to be done.
170            lb = models.LogBook("post-from-%s" % my_name)
171            fd = models.FlowDetail(
172                "song-from-%s" % my_name, uuidutils.generate_uuid()
173            )
174            lb.add(fd)
175            with contextlib.closing(persist_backend.get_connection()) as conn:
176                conn.save_logbook(lb)
177            engines.save_factory_details(
178                fd,
179                make_bottles,
180                [HOW_MANY_BOTTLES],
181                {},
182                backend=persist_backend,
183            )
184            # Post, and be done with it!
185            jb = job_backend.post("song-from-%s" % my_name, book=lb)
186            print("Posted: %s" % jb)
187            print("Goodbye...")
188
189
190def main_local():
191    # Run locally typically this is activating during unit testing when all
192    # the examples are made sure to still function correctly...
193    global TAKE_DOWN_DELAY
194    global PASS_AROUND_DELAY
195    # Make everything go much faster (so that this finishes quickly).
196    PASS_AROUND_DELAY = 0.01
197    TAKE_DOWN_DELAY = 0.01
198    JB_CONF['path'] = JB_CONF['path'] + "-" + uuidutils.generate_uuid()
199    run_poster()
200    run_conductor(only_run_once=True)
201
202
203def check_for_zookeeper(timeout=1):
204    sys.stderr.write("Testing for the existence of a zookeeper server...\n")
205    sys.stderr.write("Please wait....\n")
206    with contextlib.closing(client.KazooClient()) as test_client:
207        try:
208            test_client.start(timeout=timeout)
209        except test_client.handler.timeout_exception:
210            sys.stderr.write("Zookeeper is needed for running this example!\n")
211            traceback.print_exc()
212            return False
213        else:
214            test_client.stop()
215            return True
216
217
218def main():
219    if not check_for_zookeeper():
220        return
221    if len(sys.argv) == 1:
222        main_local()
223    elif sys.argv[1] in ('p', 'c'):
224        if sys.argv[-1] == "v":
225            logging.basicConfig(level=taskflow_logging.TRACE)
226        else:
227            logging.basicConfig(level=logging.ERROR)
228        if sys.argv[1] == 'p':
229            run_poster()
230        else:
231            run_conductor()
232    else:
233        sys.stderr.write("%s p|c (v?)\n" % os.path.basename(sys.argv[0]))
234
235
236if __name__ == '__main__':
237    main()