Examples¶
While developing TaskFlow the team has worked hard to make sure the various concepts are explained by relevant examples. Here are a few selected examples to get started (ordered by perceived complexity):
To explore more of these examples please check out the examples directory in the TaskFlow source tree.
Note
If the examples provided are not satisfactory (or up to your standards) contributions are welcome and very much appreciated to help improve them. The higher the quality and the clearer the examples are the better and more useful they are for everyone.
Hello world¶
Note
Full source located at hello_world.
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(
7 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
8)
9sys.path.insert(0, top_dir)
10
11from taskflow import engines
12from taskflow.patterns import linear_flow as lf
13from taskflow.patterns import unordered_flow as uf
14from taskflow import task
15
16
17# INTRO: This is the defacto hello world equivalent for taskflow; it shows how
18# an overly simplistic workflow can be created that runs using different
19# engines using different styles of execution (all can be used to run in
20# parallel if a workflow is provided that is parallelizable).
21
22
23class PrinterTask(task.Task):
24 def __init__(self, name, show_name=True, inject=None):
25 super().__init__(name, inject=inject)
26 self._show_name = show_name
27
28 def execute(self, output):
29 if self._show_name:
30 print(f"{self.name}: {output}")
31 else:
32 print(output)
33
34
35# This will be the work that we want done, which for this example is just to
36# print 'hello world' (like a song) using different tasks and different
37# execution models.
38song = lf.Flow("beats")
39
40# Unordered flows when ran can be ran in parallel; and a chorus is everyone
41# singing at once of course!
42hi_chorus = uf.Flow('hello')
43world_chorus = uf.Flow('world')
44for name, hello, world in [
45 ('bob', 'hello', 'world'),
46 ('joe', 'hellooo', 'worllllld'),
47 ('sue', "helloooooo!", 'wooorllld!'),
48]:
49 hi_chorus.add(
50 PrinterTask(
51 "%s@hello" % name,
52 # This will show up to the execute() method of
53 # the task as the argument named 'output' (which
54 # will allow us to print the character we want).
55 inject={'output': hello},
56 )
57 )
58 world_chorus.add(PrinterTask("%s@world" % name, inject={'output': world}))
59
60# The composition starts with the conductor and then runs in sequence with
61# the chorus running in parallel, but no matter what the 'hello' chorus must
62# always run before the 'world' chorus (otherwise the world will fall apart).
63song.add(
64 PrinterTask(
65 "conductor@begin", show_name=False, inject={'output': "*ding*"}
66 ),
67 hi_chorus,
68 world_chorus,
69 PrinterTask("conductor@end", show_name=False, inject={'output': "*dong*"}),
70)
71
72# Run in parallel using eventlet green threads...
73try:
74 import eventlet as _eventlet # noqa
75except ImportError:
76 # No eventlet currently active, skip running with it...
77 pass
78else:
79 print("-- Running in parallel using eventlet --")
80 e = engines.load(
81 song, executor='greenthreaded', engine='parallel', max_workers=1
82 )
83 e.run()
84
85
86# Run in parallel using real threads...
87print("-- Running in parallel using threads --")
88e = engines.load(song, executor='threaded', engine='parallel', max_workers=1)
89e.run()
90
91
92# Run in parallel using external processes...
93print("-- Running in parallel using processes --")
94e = engines.load(song, executor='processes', engine='parallel', max_workers=1)
95e.run()
96
97
98# Run serially (aka, if the workflow could have been ran in parallel, it will
99# not be when ran in this mode)...
100print("-- Running serially --")
101e = engines.load(song, engine='serial')
102e.run()
103print("-- Statistics gathered --")
104print(e.statistics)
Passing values from and to tasks¶
Note
Full source located at simple_linear_pass.
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6self_dir = os.path.abspath(os.path.dirname(__file__))
7top_dir = os.path.abspath(
8 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
9)
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13from taskflow import engines
14from taskflow.patterns import linear_flow
15from taskflow import task
16
17# INTRO: This example shows how a task (in a linear/serial workflow) can
18# produce an output that can be then consumed/used by a downstream task.
19
20
21class TaskA(task.Task):
22 default_provides = 'a'
23
24 def execute(self):
25 print("Executing '%s'" % (self.name))
26 return 'a'
27
28
29class TaskB(task.Task):
30 def execute(self, a):
31 print("Executing '%s'" % (self.name))
32 print("Got input '%s'" % (a))
33
34
35print("Constructing...")
36wf = linear_flow.Flow("pass-from-to")
37wf.add(TaskA('a'), TaskB('b'))
38
39print("Loading...")
40e = engines.load(wf)
41
42print("Compiling...")
43e.compile()
44
45print("Preparing...")
46e.prepare()
47
48print("Running...")
49e.run()
50
51print("Done...")
Using listeners¶
Note
Full source located at echo_listener.
1import os
2import sys
3
4logging.basicConfig(level=logging.DEBUG)
5
6top_dir = os.path.abspath(
7 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
8)
9sys.path.insert(0, top_dir)
10
11from taskflow import engines
12from taskflow.listeners import logging as logging_listener
13from taskflow.patterns import linear_flow as lf
14from taskflow import task
15
16# INTRO: This example walks through a miniature workflow which will do a
17# simple echo operation; during this execution a listener is associated with
18# the engine to receive all notifications about what the flow has performed,
19# this example dumps that output to the stdout for viewing (at debug level
20# to show all the information which is possible).
21
22
23class Echo(task.Task):
24 def execute(self):
25 print(self.name)
26
27
28# Generate the work to be done (but don't do it yet).
29wf = lf.Flow('abc')
30wf.add(Echo('a'))
31wf.add(Echo('b'))
32wf.add(Echo('c'))
33
34# This will associate the listener with the engine (the listener
35# will automatically register for notifications with the engine and deregister
36# when the context is exited).
37e = engines.load(wf)
38with logging_listener.DynamicLoggingListener(e):
39 e.run()
Using listeners (to watch a phone call)¶
Note
Full source located at simple_linear_listening.
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(
7 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
8)
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14from taskflow.types import notifier
15
16ANY = notifier.Notifier.ANY
17
18# INTRO: In this example we create two tasks (this time as functions instead
19# of task subclasses as in the simple_linear.py example), each of which ~calls~
20# a given ~phone~ number (provided as a function input) in a linear fashion
21# (one after the other).
22#
23# For a workflow which is serial this shows an extremely simple way
24# of structuring your tasks (the code that does the work) into a linear
25# sequence (the flow) and then passing the work off to an engine, with some
26# initial data to be ran in a reliable manner.
27#
28# This example shows a basic usage of the taskflow structures without involving
29# the complexity of persistence. Using the structures that taskflow provides
30# via tasks and flows makes it possible for you to easily at a later time
31# hook in a persistence layer (and then gain the functionality that offers)
32# when you decide the complexity of adding that layer in is 'worth it' for your
33# applications usage pattern (which some applications may not need).
34#
35# It **also** adds on to the simple_linear.py example by adding a set of
36# callback functions which the engine will call when a flow state transition
37# or task state transition occurs. These types of functions are useful for
38# updating task or flow progress, or for debugging, sending notifications to
39# external systems, or for other yet unknown future usage that you may create!
40
41
42def call_jim(context):
43 print("Calling jim.")
44 print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
45
46
47def call_joe(context):
48 print("Calling joe.")
49 print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
50
51
52def flow_watch(state, details):
53 print('Flow => %s' % state)
54
55
56def task_watch(state, details):
57 print('Task {} => {}'.format(details.get('task_name'), state))
58
59
60# Wrap your functions into a task type that knows how to treat your functions
61# as tasks. There was previous work done to just allow a function to be
62# directly passed, but in python 3.0 there is no easy way to capture an
63# instance method, so this wrapping approach was decided upon instead which
64# can attach to instance methods (if that's desired).
65flow = lf.Flow("Call-them")
66flow.add(task.FunctorTask(execute=call_jim))
67flow.add(task.FunctorTask(execute=call_joe))
68
69# Now load (but do not run) the flow using the provided initial data.
70engine = taskflow.engines.load(
71 flow,
72 store={
73 'context': {
74 "joe_number": 444,
75 "jim_number": 555,
76 }
77 },
78)
79
80# This is where we attach our callback functions to the 2 different
81# notification objects that an engine exposes. The usage of a ANY (kleene star)
82# here means that we want to be notified on all state changes, if you want to
83# restrict to a specific state change, just register that instead.
84engine.notifier.register(ANY, flow_watch)
85engine.atom_notifier.register(ANY, task_watch)
86
87# And now run!
88engine.run()
Dumping a in-memory backend¶
Note
Full source located at dump_memory_backend.
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6self_dir = os.path.abspath(os.path.dirname(__file__))
7top_dir = os.path.abspath(
8 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
9)
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13from taskflow import engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import task
16
17# INTRO: in this example we create a dummy flow with a dummy task, and run
18# it using a in-memory backend and pre/post run we dump out the contents
19# of the in-memory backends tree structure (which can be quite useful to
20# look at for debugging or other analysis).
21
22
23class PrintTask(task.Task):
24 def execute(self):
25 print("Running '%s'" % self.name)
26
27
28# Make a little flow and run it...
29f = lf.Flow('root')
30for alpha in ['a', 'b', 'c']:
31 f.add(PrintTask(alpha))
32
33e = engines.load(f)
34e.compile()
35e.prepare()
36
37# After prepare the storage layer + backend can now be accessed safely...
38backend = e.storage.backend
39
40print("----------")
41print("Before run")
42print("----------")
43print(backend.memory.pformat())
44print("----------")
45
46e.run()
47
48print("---------")
49print("After run")
50print("---------")
51for path in backend.memory.ls_r(backend.memory.root_path, absolute=True):
52 value = backend.memory[path]
53 if value:
54 print(f"{path} -> {value}")
55 else:
56 print("%s" % (path))
Making phone calls¶
Note
Full source located at simple_linear.
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(
7 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
8)
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14
15# INTRO: In this example we create two tasks, each of which ~calls~ a given
16# ~phone~ number (provided as a function input) in a linear fashion (one after
17# the other). For a workflow which is serial this shows a extremely simple way
18# of structuring your tasks (the code that does the work) into a linear
19# sequence (the flow) and then passing the work off to an engine, with some
20# initial data to be ran in a reliable manner.
21#
22# NOTE(harlowja): This example shows a basic usage of the taskflow structures
23# without involving the complexity of persistence. Using the structures that
24# taskflow provides via tasks and flows makes it possible for you to easily at
25# a later time hook in a persistence layer (and then gain the functionality
26# that offers) when you decide the complexity of adding that layer in
27# is 'worth it' for your application's usage pattern (which certain
28# applications may not need).
29
30
31class CallJim(task.Task):
32 def execute(self, jim_number, *args, **kwargs):
33 print("Calling jim %s." % jim_number)
34
35
36class CallJoe(task.Task):
37 def execute(self, joe_number, *args, **kwargs):
38 print("Calling joe %s." % joe_number)
39
40
41# Create your flow and associated tasks (the work to be done).
42flow = lf.Flow('simple-linear').add(CallJim(), CallJoe())
43
44# Now run that flow using the provided initial data (store below).
45taskflow.engines.run(flow, store=dict(joe_number=444, jim_number=555))
Making phone calls (automatically reverting)¶
Note
Full source located at reverting_linear.
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(
7 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
8)
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14
15# INTRO: In this example we create three tasks, each of which ~calls~ a given
16# number (provided as a function input), one of those tasks *fails* calling a
17# given number (the suzzie calling); this causes the workflow to enter the
18# reverting process, which activates the revert methods of the previous two
19# phone ~calls~.
20#
21# This simulated calling makes it appear like all three calls occur or all
22# three don't occur (transaction-like capabilities). No persistence layer is
23# used here so reverting and executing will *not* be tolerant of process
24# failure.
25
26
27class CallJim(task.Task):
28 def execute(self, jim_number, *args, **kwargs):
29 print("Calling jim %s." % jim_number)
30
31 def revert(self, jim_number, *args, **kwargs):
32 print("Calling %s and apologizing." % jim_number)
33
34
35class CallJoe(task.Task):
36 def execute(self, joe_number, *args, **kwargs):
37 print("Calling joe %s." % joe_number)
38
39 def revert(self, joe_number, *args, **kwargs):
40 print("Calling %s and apologizing." % joe_number)
41
42
43class CallSuzzie(task.Task):
44 def execute(self, suzzie_number, *args, **kwargs):
45 raise OSError("Suzzie not home right now.")
46
47
48# Create your flow and associated tasks (the work to be done).
49flow = lf.Flow('simple-linear').add(CallJim(), CallJoe(), CallSuzzie())
50
51try:
52 # Now run that flow using the provided initial data (store below).
53 taskflow.engines.run(
54 flow, store=dict(joe_number=444, jim_number=555, suzzie_number=666)
55 )
56except Exception as e:
57 # NOTE(harlowja): This exception will be the exception that came out of the
58 # 'CallSuzzie' task instead of a different exception, this is useful since
59 # typically surrounding code wants to handle the original exception and not
60 # a wrapped or altered one.
61 #
62 # *WARNING* If this flow was multi-threaded and multiple active tasks threw
63 # exceptions then the above exception would be wrapped into a combined
64 # exception (the object has methods to iterate over the contained
65 # exceptions). See: exceptions.py and the class 'WrappedFailure' to look at
66 # how to deal with multiple tasks failing while running.
67 #
68 # You will also note that this is not a problem in this case since no
69 # parallelism is involved; this is ensured by the usage of a linear flow
70 # and the default engine type which is 'serial' vs being 'parallel'.
71 print("Flow failed: %s" % e)
Building a car¶
Note
Full source located at build_a_car.
1import os
2import sys
3
4
5logging.basicConfig(level=logging.ERROR)
6
7top_dir = os.path.abspath(
8 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
9)
10sys.path.insert(0, top_dir)
11
12
13import taskflow.engines
14from taskflow.patterns import graph_flow as gf
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17from taskflow.types import notifier
18
19ANY = notifier.Notifier.ANY
20
21import example_utils as eu # noqa
22
23
24# INTRO: This example shows how a graph flow and linear flow can be used
25# together to execute dependent & non-dependent tasks by going through the
26# steps required to build a simplistic car (an assembly line if you will). It
27# also shows how raw functions can be wrapped into a task object instead of
28# being forced to use the more *heavy* task base class. This is useful in
29# scenarios where pre-existing code has functions that you easily want to
30# plug-in to taskflow, without requiring a large amount of code changes.
31
32
33def build_frame():
34 return 'steel'
35
36
37def build_engine():
38 return 'honda'
39
40
41def build_doors():
42 return '2'
43
44
45def build_wheels():
46 return '4'
47
48
49# These just return true to indiciate success, they would in the real work
50# do more than just that.
51
52
53def install_engine(frame, engine):
54 return True
55
56
57def install_doors(frame, windows_installed, doors):
58 return True
59
60
61def install_windows(frame, doors):
62 return True
63
64
65def install_wheels(frame, engine, engine_installed, wheels):
66 return True
67
68
69def trash(**kwargs):
70 eu.print_wrapped("Throwing away pieces of car!")
71
72
73def startup(**kwargs):
74 # If you want to see the rollback function being activated try uncommenting
75 # the following line.
76 #
77 # raise ValueError("Car not verified")
78 return True
79
80
81def verify(spec, **kwargs):
82 # If the car is not what we ordered throw away the car (trigger reversion).
83 for key, value in kwargs.items():
84 if spec[key] != value:
85 raise Exception("Car doesn't match spec!")
86 return True
87
88
89# These two functions connect into the state transition notification emission
90# points that the engine outputs, they can be used to log state transitions
91# that are occurring, or they can be used to suspend the engine (or perform
92# other useful activities).
93def flow_watch(state, details):
94 print('Flow => %s' % state)
95
96
97def task_watch(state, details):
98 print('Task {} => {}'.format(details.get('task_name'), state))
99
100
101flow = lf.Flow("make-auto").add(
102 task.FunctorTask(startup, revert=trash, provides='ran'),
103 # A graph flow allows automatic dependency based ordering, the ordering
104 # is determined by analyzing the symbols required and provided and ordering
105 # execution based on a functioning order (if one exists).
106 gf.Flow("install-parts").add(
107 task.FunctorTask(build_frame, provides='frame'),
108 task.FunctorTask(build_engine, provides='engine'),
109 task.FunctorTask(build_doors, provides='doors'),
110 task.FunctorTask(build_wheels, provides='wheels'),
111 # These *_installed outputs allow for other tasks to depend on certain
112 # actions being performed (aka the components were installed), another
113 # way to do this is to link() the tasks manually instead of creating
114 # an 'artificial' data dependency that accomplishes the same goal the
115 # manual linking would result in.
116 task.FunctorTask(install_engine, provides='engine_installed'),
117 task.FunctorTask(install_doors, provides='doors_installed'),
118 task.FunctorTask(install_windows, provides='windows_installed'),
119 task.FunctorTask(install_wheels, provides='wheels_installed'),
120 ),
121 task.FunctorTask(
122 verify,
123 requires=[
124 'frame',
125 'engine',
126 'doors',
127 'wheels',
128 'engine_installed',
129 'doors_installed',
130 'windows_installed',
131 'wheels_installed',
132 ],
133 ),
134)
135
136# This dictionary will be provided to the tasks as a specification for what
137# the tasks should produce, in this example this specification will influence
138# what those tasks do and what output they create. Different tasks depend on
139# different information from this specification, all of which will be provided
140# automatically by the engine to those tasks.
141spec = {
142 "frame": 'steel',
143 "engine": 'honda',
144 "doors": '2',
145 "wheels": '4',
146 # These are used to compare the result product, a car without the pieces
147 # installed is not a car after all.
148 "engine_installed": True,
149 "doors_installed": True,
150 "windows_installed": True,
151 "wheels_installed": True,
152}
153
154
155engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
156
157# This registers all (ANY) state transitions to trigger a call to the
158# flow_watch function for flow state transitions, and registers the
159# same all (ANY) state transitions for task state transitions.
160engine.notifier.register(ANY, flow_watch)
161engine.atom_notifier.register(ANY, task_watch)
162
163eu.print_wrapped("Building a car")
164engine.run()
165
166# Alter the specification and ensure that the reverting logic gets triggered
167# since the resultant car that will be built by the build_wheels function will
168# build a car with 4 doors only (not 5), this will cause the verification
169# task to mark the car that is produced as not matching the desired spec.
170spec['doors'] = 5
171
172engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
173engine.notifier.register(ANY, flow_watch)
174engine.atom_notifier.register(ANY, task_watch)
175
176eu.print_wrapped("Building a wrong car that doesn't match specification")
177try:
178 engine.run()
179except Exception as e:
180 eu.print_wrapped("Flow failed: %s" % e)
Iterating over the alphabet (using processes)¶
Note
Full source located at alphabet_soup.
1import functools
2import logging
3import os
4import string
5import sys
6import time
7
8logging.basicConfig(level=logging.ERROR)
9
10self_dir = os.path.abspath(os.path.dirname(__file__))
11top_dir = os.path.abspath(
12 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
13)
14sys.path.insert(0, top_dir)
15sys.path.insert(0, self_dir)
16
17from taskflow import engines
18from taskflow import exceptions
19from taskflow.patterns import linear_flow
20from taskflow import task
21
22
23# In this example we show how a simple linear set of tasks can be executed
24# using local processes (and not threads or remote workers) with minimal (if
25# any) modification to those tasks to make them safe to run in this mode.
26#
27# This is useful since it allows further scaling up your workflows when thread
28# execution starts to become a bottleneck (which it can start to be due to the
29# GIL in python). It also offers a intermediary scalable runner that can be
30# used when the scale and/or setup of remote workers is not desirable.
31
32
33def progress_printer(task, event_type, details):
34 # This callback, attached to each task will be called in the local
35 # process (not the child processes)...
36 progress = details.pop('progress')
37 progress = int(progress * 100.0)
38 print("Task '%s' reached %d%% completion" % (task.name, progress))
39
40
41class AlphabetTask(task.Task):
42 # Second delay between each progress part.
43 _DELAY = 0.1
44
45 # This task will run in X main stages (each with a different progress
46 # report that will be delivered back to the running process...). The
47 # initial 0% and 100% are triggered automatically by the engine when
48 # a task is started and finished (so that's why those are not emitted
49 # here).
50 _PROGRESS_PARTS = [fractions.Fraction("%s/5" % x) for x in range(1, 5)]
51
52 def execute(self):
53 for p in self._PROGRESS_PARTS:
54 self.update_progress(p)
55 time.sleep(self._DELAY)
56
57
58print("Constructing...")
59soup = linear_flow.Flow("alphabet-soup")
60for letter in string.ascii_lowercase:
61 abc = AlphabetTask(letter)
62 abc.notifier.register(
63 task.EVENT_UPDATE_PROGRESS, functools.partial(progress_printer, abc)
64 )
65 soup.add(abc)
66try:
67 print("Loading...")
68 e = engines.load(soup, engine='parallel', executor='processes')
69 print("Compiling...")
70 e.compile()
71 print("Preparing...")
72 e.prepare()
73 print("Running...")
74 e.run()
75 print("Done: %s" % e.statistics)
76except exceptions.NotImplementedError as e:
77 print(e)
Watching execution timing¶
Note
Full source located at timing_listener.
1import os
2import random
3import sys
4import time
5
6logging.basicConfig(level=logging.ERROR)
7
8top_dir = os.path.abspath(
9 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
10)
11sys.path.insert(0, top_dir)
12
13from taskflow import engines
14from taskflow.listeners import timing
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17
18# INTRO: in this example we will attach a listener to an engine
19# and have variable run time tasks run and show how the listener will print
20# out how long those tasks took (when they started and when they finished).
21#
22# This shows how timing metrics can be gathered (or attached onto an engine)
23# after a workflow has been constructed, making it easy to gather metrics
24# dynamically for situations where this kind of information is applicable (or
25# even adding this information on at a later point in the future when your
26# application starts to slow down).
27
28
29class VariableTask(task.Task):
30 def __init__(self, name):
31 super().__init__(name)
32 self._sleepy_time = random.random()
33
34 def execute(self):
35 time.sleep(self._sleepy_time)
36
37
38f = lf.Flow('root')
39f.add(VariableTask('a'), VariableTask('b'), VariableTask('c'))
40e = engines.load(f)
41with timing.PrintingDurationListener(e):
42 e.run()
Distance calculator¶
Note
Full source located at distance_calculator
1import math
2import os
3import sys
4
5top_dir = os.path.abspath(
6 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
7)
8sys.path.insert(0, top_dir)
9
10from taskflow import engines
11from taskflow.patterns import linear_flow
12from taskflow import task
13
14# INTRO: This shows how to use a tasks/atoms ability to take requirements from
15# its execute functions default parameters and shows how to provide those
16# via different methods when needed, to influence those parameters to in
17# this case calculate the distance between two points in 2D space.
18
19# A 2D point.
20Point = collections.namedtuple("Point", "x,y")
21
22
23def is_near(val, expected, tolerance=0.001):
24 # Floats don't really provide equality...
25 if val > (expected + tolerance):
26 return False
27 if val < (expected - tolerance):
28 return False
29 return True
30
31
32class DistanceTask(task.Task):
33 # See: http://en.wikipedia.org/wiki/Distance#Distance_in_Euclidean_space
34
35 default_provides = 'distance'
36
37 def execute(self, a=Point(0, 0), b=Point(0, 0)):
38 return math.sqrt(math.pow(b.x - a.x, 2) + math.pow(b.y - a.y, 2))
39
40
41if __name__ == '__main__':
42 # For these we rely on the execute() methods points by default being
43 # at the origin (and we override it with store values when we want) at
44 # execution time (which then influences what is calculated).
45 any_distance = linear_flow.Flow("origin").add(DistanceTask())
46 results = engines.run(any_distance)
47 print(results)
48 print(
49 "{} is near-enough to {}: {}".format(
50 results['distance'], 0.0, is_near(results['distance'], 0.0)
51 )
52 )
53
54 results = engines.run(any_distance, store={'a': Point(1, 1)})
55 print(results)
56 print(
57 "{} is near-enough to {}: {}".format(
58 results['distance'], 1.4142, is_near(results['distance'], 1.4142)
59 )
60 )
61
62 results = engines.run(any_distance, store={'a': Point(10, 10)})
63 print(results)
64 print(
65 "{} is near-enough to {}: {}".format(
66 results['distance'],
67 14.14199,
68 is_near(results['distance'], 14.14199),
69 )
70 )
71
72 results = engines.run(
73 any_distance, store={'a': Point(5, 5), 'b': Point(10, 10)}
74 )
75 print(results)
76 print(
77 "{} is near-enough to {}: {}".format(
78 results['distance'], 7.07106, is_near(results['distance'], 7.07106)
79 )
80 )
81
82 # For this we use the ability to override at task creation time the
83 # optional arguments so that we don't need to continue to send them
84 # in via the 'store' argument like in the above (and we fix the new
85 # starting point 'a' at (10, 10) instead of (0, 0)...
86
87 ten_distance = linear_flow.Flow("ten")
88 ten_distance.add(DistanceTask(inject={'a': Point(10, 10)}))
89 results = engines.run(ten_distance, store={'b': Point(10, 10)})
90 print(results)
91 print(
92 "{} is near-enough to {}: {}".format(
93 results['distance'], 0.0, is_near(results['distance'], 0.0)
94 )
95 )
96
97 results = engines.run(ten_distance)
98 print(results)
99 print(
100 "{} is near-enough to {}: {}".format(
101 results['distance'],
102 14.14199,
103 is_near(results['distance'], 14.14199),
104 )
105 )
Table multiplier (in parallel)¶
Note
Full source located at parallel_table_multiply
1import logging
2import os
3import random
4import sys
5
6logging.basicConfig(level=logging.ERROR)
7
8top_dir = os.path.abspath(
9 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
10)
11sys.path.insert(0, top_dir)
12
13import futurist
14
15from taskflow import engines
16from taskflow.patterns import unordered_flow as uf
17from taskflow import task
18
19# INTRO: This example walks through a miniature workflow which does a parallel
20# table modification where each row in the table gets adjusted by a thread, or
21# green thread (if eventlet is available) in parallel and then the result
22# is reformed into a new table and some verifications are performed on it
23# to ensure everything went as expected.
24
25
26MULTIPLER = 10
27
28
29class RowMultiplier(task.Task):
30 """Performs a modification of an input row, creating a output row."""
31
32 def __init__(self, name, index, row, multiplier):
33 super().__init__(name=name)
34 self.index = index
35 self.multiplier = multiplier
36 self.row = row
37
38 def execute(self):
39 return [r * self.multiplier for r in self.row]
40
41
42def make_flow(table):
43 # This creation will allow for parallel computation (since the flow here
44 # is specifically unordered; and when things are unordered they have
45 # no dependencies and when things have no dependencies they can just be
46 # ran at the same time, limited in concurrency by the executor or max
47 # workers of that executor...)
48 f = uf.Flow("root")
49 for i, row in enumerate(table):
50 f.add(RowMultiplier("m-%s" % i, i, row, MULTIPLER))
51 # NOTE(harlowja): at this point nothing has ran, the above is just
52 # defining what should be done (but not actually doing it) and associating
53 # an ordering dependencies that should be enforced (the flow pattern used
54 # forces this), the engine in the later main() function will actually
55 # perform this work...
56 return f
57
58
59def main():
60 if len(sys.argv) == 2:
61 tbl = []
62 with open(sys.argv[1], 'rb') as fh:
63 reader = csv.reader(fh)
64 for row in reader:
65 tbl.append([float(r) if r else 0.0 for r in row])
66 else:
67 # Make some random table out of thin air...
68 tbl = []
69 cols = random.randint(1, 100)
70 rows = random.randint(1, 100)
71 for _i in range(0, rows):
72 row = []
73 for _j in range(0, cols):
74 row.append(random.random())
75 tbl.append(row)
76
77 # Generate the work to be done.
78 f = make_flow(tbl)
79
80 # Now run it (using the specified executor)...
81 try:
82 executor = futurist.GreenThreadPoolExecutor(max_workers=5)
83 except RuntimeError:
84 # No eventlet currently active, use real threads instead.
85 executor = futurist.ThreadPoolExecutor(max_workers=5)
86 try:
87 e = engines.load(f, engine='parallel', executor=executor)
88 for st in e.run_iter():
89 print(st)
90 finally:
91 executor.shutdown()
92
93 # Find the old rows and put them into place...
94 #
95 # TODO(harlowja): probably easier just to sort instead of search...
96 computed_tbl = []
97 for i in range(0, len(tbl)):
98 for t in f:
99 if t.index == i:
100 computed_tbl.append(e.storage.get(t.name))
101
102 # Do some basic validation (which causes the return code of this process
103 # to be different if things were not as expected...)
104 if len(computed_tbl) != len(tbl):
105 return 1
106 else:
107 return 0
108
109
110if __name__ == "__main__":
111 sys.exit(main())
Linear equation solver (explicit dependencies)¶
Note
Full source located at calculate_linear.
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(
7 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
8)
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14
15
16# INTRO: In this example a linear flow is used to group four tasks to calculate
17# a value. A single added task is used twice, showing how this can be done
18# and the twice added task takes in different bound values. In the first case
19# it uses default parameters ('x' and 'y') and in the second case arguments
20# are bound with ('z', 'd') keys from the engines internal storage mechanism.
21#
22# A multiplier task uses a binding that another task also provides, but this
23# example explicitly shows that 'z' parameter is bound with 'a' key
24# This shows that if a task depends on a key named the same as a key provided
25# from another task the name can be remapped to take the desired key from a
26# different origin.
27
28
29# This task provides some values from as a result of execution, this can be
30# useful when you want to provide values from a static set to other tasks that
31# depend on those values existing before those tasks can run.
32#
33# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
34# that just provides those values on engine running by prepopulating the
35# storage backend before your tasks are ran (which accomplishes a similar goal
36# in a more uniform manner).
37class Provider(task.Task):
38 def __init__(self, name, *args, **kwargs):
39 super().__init__(name=name, **kwargs)
40 self._provide = args
41
42 def execute(self):
43 return self._provide
44
45
46# This task adds two input variables and returns the result.
47#
48# Note that since this task does not have a revert() function (since addition
49# is a stateless operation) there are no side-effects that this function needs
50# to undo if some later operation fails.
51class Adder(task.Task):
52 def execute(self, x, y):
53 return x + y
54
55
56# This task multiplies an input variable by a multiplier and returns the
57# result.
58#
59# Note that since this task does not have a revert() function (since
60# multiplication is a stateless operation) and there are no side-effects that
61# this function needs to undo if some later operation fails.
62class Multiplier(task.Task):
63 def __init__(self, name, multiplier, provides=None, rebind=None):
64 super().__init__(name=name, provides=provides, rebind=rebind)
65 self._multiplier = multiplier
66
67 def execute(self, z):
68 return z * self._multiplier
69
70
71# Note here that the ordering is established so that the correct sequences
72# of operations occurs where the adding and multiplying is done according
73# to the expected and typical mathematical model. A graph flow could also be
74# used here to automatically infer & ensure the correct ordering.
75flow = lf.Flow('root').add(
76 # Provide the initial values for other tasks to depend on.
77 #
78 # x = 2, y = 3, d = 5
79 Provider("provide-adder", 2, 3, 5, provides=('x', 'y', 'd')),
80 # z = x+y = 5
81 Adder("add-1", provides='z'),
82 # a = z+d = 10
83 Adder("add-2", provides='a', rebind=['z', 'd']),
84 # Calculate 'r = a*3 = 30'
85 #
86 # Note here that the 'z' argument of the execute() function will not be
87 # bound to the 'z' variable provided from the above 'provider' object but
88 # instead the 'z' argument will be taken from the 'a' variable provided
89 # by the second add-2 listed above.
90 Multiplier("multi", 3, provides='r', rebind={'z': 'a'}),
91)
92
93# The result here will be all results (from all tasks) which is stored in an
94# in-memory storage location that backs this engine since it is not configured
95# with persistence storage.
96results = taskflow.engines.run(flow)
97print(results)
Linear equation solver (inferred dependencies)¶
Source: graph_flow.py
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(
7 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
8)
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import graph_flow as gf
13from taskflow.patterns import linear_flow as lf
14from taskflow import task
15
16
17# In this example there are complex *inferred* dependencies between tasks that
18# are used to perform a simple set of linear equations.
19#
20# As you will see below the tasks just define what they require as input
21# and produce as output (named values). Then the user doesn't care about
22# ordering the tasks (in this case the tasks calculate pieces of the overall
23# equation).
24#
25# As you will notice a graph flow resolves dependencies automatically using the
26# tasks symbol requirements and provided symbol values and no orderin
27# dependency has to be manually created.
28#
29# Also notice that flows of any types can be nested into a graph flow; showing
30# that subflow dependencies (and associated ordering) will be inferred too.
31
32
33class Adder(task.Task):
34 def execute(self, x, y):
35 return x + y
36
37
38flow = gf.Flow('root').add(
39 lf.Flow('nested_linear').add(
40 # x2 = y3+y4 = 12
41 Adder("add2", provides='x2', rebind=['y3', 'y4']),
42 # x1 = y1+y2 = 4
43 Adder("add1", provides='x1', rebind=['y1', 'y2']),
44 ),
45 # x5 = x1+x3 = 20
46 Adder("add5", provides='x5', rebind=['x1', 'x3']),
47 # x3 = x1+x2 = 16
48 Adder("add3", provides='x3', rebind=['x1', 'x2']),
49 # x4 = x2+y5 = 21
50 Adder("add4", provides='x4', rebind=['x2', 'y5']),
51 # x6 = x5+x4 = 41
52 Adder("add6", provides='x6', rebind=['x5', 'x4']),
53 # x7 = x6+x6 = 82
54 Adder("add7", provides='x7', rebind=['x6', 'x6']),
55)
56
57# Provide the initial variable inputs using a storage dictionary.
58store = {
59 "y1": 1,
60 "y2": 3,
61 "y3": 5,
62 "y4": 7,
63 "y5": 9,
64}
65
66# This is the expected values that should be created.
67unexpected = 0
68expected = [
69 ('x1', 4),
70 ('x2', 12),
71 ('x3', 16),
72 ('x4', 21),
73 ('x5', 20),
74 ('x6', 41),
75 ('x7', 82),
76]
77
78result = taskflow.engines.run(flow, engine='serial', store=store)
79
80print("Single threaded engine result %s" % result)
81for name, value in expected:
82 actual = result.get(name)
83 if actual != value:
84 sys.stderr.write(f"{actual} != {value}\n")
85 unexpected += 1
86
87result = taskflow.engines.run(flow, engine='parallel', store=store)
88
89print("Multi threaded engine result %s" % result)
90for name, value in expected:
91 actual = result.get(name)
92 if actual != value:
93 sys.stderr.write(f"{actual} != {value}\n")
94 unexpected += 1
95
96if unexpected:
97 sys.exit(1)
Linear equation solver (in parallel)¶
Note
Full source located at calculate_in_parallel
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(
7 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
8)
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow.patterns import unordered_flow as uf
14from taskflow import task
15
16# INTRO: These examples show how a linear flow and an unordered flow can be
17# used together to execute calculations in parallel and then use the
18# result for the next task/s. The adder task is used for all calculations
19# and argument bindings are used to set correct parameters for each task.
20
21
22# This task provides some values from as a result of execution, this can be
23# useful when you want to provide values from a static set to other tasks that
24# depend on those values existing before those tasks can run.
25#
26# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
27# that provides those values on engine running by prepopulating the storage
28# backend before your tasks are ran (which accomplishes a similar goal in a
29# more uniform manner).
30class Provider(task.Task):
31 def __init__(self, name, *args, **kwargs):
32 super().__init__(name=name, **kwargs)
33 self._provide = args
34
35 def execute(self):
36 return self._provide
37
38
39# This task adds two input variables and returns the result of that addition.
40#
41# Note that since this task does not have a revert() function (since addition
42# is a stateless operation) there are no side-effects that this function needs
43# to undo if some later operation fails.
44class Adder(task.Task):
45 def execute(self, x, y):
46 return x + y
47
48
49flow = lf.Flow('root').add(
50 # Provide the initial values for other tasks to depend on.
51 #
52 # x1 = 2, y1 = 3, x2 = 5, x3 = 8
53 Provider("provide-adder", 2, 3, 5, 8, provides=('x1', 'y1', 'x2', 'y2')),
54 # Note here that we define the flow that contains the 2 adders to be an
55 # unordered flow since the order in which these execute does not matter,
56 # another way to solve this would be to use a graph_flow pattern, which
57 # also can run in parallel (since they have no ordering dependencies).
58 uf.Flow('adders').add(
59 # Calculate 'z1 = x1+y1 = 5'
60 #
61 # Rebind here means that the execute() function x argument will be
62 # satisfied from a previous output named 'x1', and the y argument
63 # of execute() will be populated from the previous output named 'y1'
64 #
65 # The output (result of adding) will be mapped into a variable named
66 # 'z1' which can then be refereed to and depended on by other tasks.
67 Adder(name="add", provides='z1', rebind=['x1', 'y1']),
68 # z2 = x2+y2 = 13
69 Adder(name="add-2", provides='z2', rebind=['x2', 'y2']),
70 ),
71 # r = z1+z2 = 18
72 Adder(name="sum-1", provides='r', rebind=['z1', 'z2']),
73)
74
75
76# The result here will be all results (from all tasks) which is stored in an
77# in-memory storage location that backs this engine since it is not configured
78# with persistence storage.
79result = taskflow.engines.run(flow, engine='parallel')
80print(result)
Creating a volume (in parallel)¶
Note
Full source located at create_parallel_volume
1import logging
2import os
3import random
4import sys
5import time
6
7logging.basicConfig(level=logging.ERROR)
8
9top_dir = os.path.abspath(
10 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
11)
12sys.path.insert(0, top_dir)
13
14from oslo_utils import reflection
15
16from taskflow import engines
17from taskflow.listeners import printing
18from taskflow.patterns import unordered_flow as uf
19from taskflow import task
20
21# INTRO: These examples show how unordered_flow can be used to create a large
22# number of fake volumes in parallel (or serially, depending on a constant that
23# can be easily changed).
24
25
26@contextlib.contextmanager
27def show_time(name):
28 start = time.time()
29 yield
30 end = time.time()
31 print(f" -- {name} took {end - start:0.3f} seconds")
32
33
34# This affects how many volumes to create and how much time to *simulate*
35# passing for that volume to be created.
36MAX_CREATE_TIME = 3
37VOLUME_COUNT = 5
38
39# This will be used to determine if all the volumes are created in parallel
40# or whether the volumes are created serially (in an undefined ordered since
41# a unordered flow is used). Note that there is a disconnection between the
42# ordering and the concept of parallelism (since unordered items can still be
43# ran in a serial ordering). A typical use-case for offering both is to allow
44# for debugging using a serial approach, while when running at a larger scale
45# one would likely want to use the parallel approach.
46#
47# If you switch this flag from serial to parallel you can see the overall
48# time difference that this causes.
49SERIAL = False
50if SERIAL:
51 engine = 'serial'
52else:
53 engine = 'parallel'
54
55
56class VolumeCreator(task.Task):
57 def __init__(self, volume_id):
58 # Note here that the volume name is composed of the name of the class
59 # along with the volume id that is being created, since a name of a
60 # task uniquely identifies that task in storage it is important that
61 # the name be relevant and identifiable if the task is recreated for
62 # subsequent resumption (if applicable).
63 #
64 # UUIDs are *not* used as they can not be tied back to a previous tasks
65 # state on resumption (since they are unique and will vary for each
66 # task that is created). A name based off the volume id that is to be
67 # created is more easily tied back to the original task so that the
68 # volume create can be resumed/revert, and is much easier to use for
69 # audit and tracking purposes.
70 base_name = reflection.get_callable_name(self)
71 super().__init__(name=f"{base_name}-{volume_id}")
72 self._volume_id = volume_id
73
74 def execute(self):
75 print("Making volume %s" % (self._volume_id))
76 time.sleep(random.random() * MAX_CREATE_TIME)
77 print("Finished making volume %s" % (self._volume_id))
78
79
80# Assume there is no ordering dependency between volumes.
81flow = uf.Flow("volume-maker")
82for i in range(0, VOLUME_COUNT):
83 flow.add(VolumeCreator(volume_id="vol-%s" % (i)))
84
85
86# Show how much time the overall engine loading and running takes.
87with show_time(name=flow.name.title()):
88 eng = engines.load(flow, engine=engine)
89 # This context manager automatically adds (and automatically removes) a
90 # helpful set of state transition notification printing helper utilities
91 # that show you exactly what transitions the engine is going through
92 # while running the various volume create tasks.
93 with printing.PrintingListener(eng):
94 eng.run()
Summation mapper(s) and reducer (in parallel)¶
Note
Full source located at simple_map_reduce
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6self_dir = os.path.abspath(os.path.dirname(__file__))
7top_dir = os.path.abspath(
8 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
9)
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13# INTRO: These examples show a simplistic map/reduce implementation where
14# a set of mapper(s) will sum a series of input numbers (in parallel) and
15# return their individual summed result. A reducer will then use those
16# produced values and perform a final summation and this result will then be
17# printed (and verified to ensure the calculation was as expected).
18
19from taskflow import engines
20from taskflow.patterns import linear_flow
21from taskflow.patterns import unordered_flow
22from taskflow import task
23
24
25class SumMapper(task.Task):
26 def execute(self, inputs):
27 # Sums some set of provided inputs.
28 return sum(inputs)
29
30
31class TotalReducer(task.Task):
32 def execute(self, *args, **kwargs):
33 # Reduces all mapped summed outputs into a single value.
34 total = 0
35 for k, v in kwargs.items():
36 # If any other kwargs was passed in, we don't want to use those
37 # in the calculation of the total...
38 if k.startswith('reduction_'):
39 total += v
40 return total
41
42
43def chunk_iter(chunk_size, upperbound):
44 """Yields back chunk size pieces from zero to upperbound - 1."""
45 chunk = []
46 for i in range(0, upperbound):
47 chunk.append(i)
48 if len(chunk) == chunk_size:
49 yield chunk
50 chunk = []
51
52
53# Upper bound of numbers to sum for example purposes...
54UPPER_BOUND = 10000
55
56# How many mappers we want to have.
57SPLIT = 10
58
59# How big of a chunk we want to give each mapper.
60CHUNK_SIZE = UPPER_BOUND // SPLIT
61
62# This will be the workflow we will compose and run.
63w = linear_flow.Flow("root")
64
65# The mappers will run in parallel.
66store = {}
67provided = []
68mappers = unordered_flow.Flow('map')
69for i, chunk in enumerate(chunk_iter(CHUNK_SIZE, UPPER_BOUND)):
70 mapper_name = 'mapper_%s' % i
71 # Give that mapper some information to compute.
72 store[mapper_name] = chunk
73 # The reducer uses all of the outputs of the mappers, so it needs
74 # to be recorded that it needs access to them (under a specific name).
75 provided.append("reduction_%s" % i)
76 mappers.add(
77 SumMapper(
78 name=mapper_name,
79 rebind={'inputs': mapper_name},
80 provides=provided[-1],
81 )
82 )
83w.add(mappers)
84
85# The reducer will run last (after all the mappers).
86w.add(TotalReducer('reducer', requires=provided))
87
88# Now go!
89e = engines.load(w, engine='parallel', store=store, max_workers=4)
90print("Running a parallel engine with options: %s" % e.options)
91e.run()
92
93# Now get the result the reducer created.
94total = e.storage.get('reducer')
95print("Calculated result = %s" % total)
96
97# Calculate it manually to verify that it worked...
98calc_total = sum(range(0, UPPER_BOUND))
99if calc_total != total:
100 sys.exit(1)
Storing & emitting a bill¶
Note
Full source located at fake_billing
1import logging
2import os
3import sys
4import time
5
6logging.basicConfig(level=logging.ERROR)
7
8top_dir = os.path.abspath(
9 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
10)
11sys.path.insert(0, top_dir)
12
13from oslo_utils import uuidutils
14
15from taskflow import engines
16from taskflow.listeners import printing
17from taskflow.patterns import graph_flow as gf
18from taskflow.patterns import linear_flow as lf
19from taskflow import task
20from taskflow.utils import misc
21
22# INTRO: This example walks through a miniature workflow which simulates
23# the reception of an API request, creation of a database entry, driver
24# activation (which invokes a 'fake' webservice) and final completion.
25#
26# This example also shows how a function/object (in this class the url sending)
27# that occurs during driver activation can update the progress of a task
28# without being aware of the internals of how to do this by associating a
29# callback that the url sending can update as the sending progresses from 0.0%
30# complete to 100% complete.
31
32
33class DB:
34 def query(self, sql):
35 print("Querying with: %s" % (sql))
36
37
38class UrlCaller:
39 def __init__(self):
40 self._send_time = 0.5
41 self._chunks = 25
42
43 def send(self, url, data, status_cb=None):
44 sleep_time = float(self._send_time) / self._chunks
45 for i in range(0, len(data)):
46 time.sleep(sleep_time)
47 # As we send the data, each chunk we 'fake' send will progress
48 # the sending progress that much further to 100%.
49 if status_cb:
50 status_cb(float(i) / len(data))
51
52
53# Since engines save the output of tasks to a optional persistent storage
54# backend resources have to be dealt with in a slightly different manner since
55# resources are transient and can *not* be persisted (or serialized). For tasks
56# that require access to a set of resources it is a common pattern to provide
57# a object (in this case this object) on construction of those tasks via the
58# task constructor.
59class ResourceFetcher:
60 def __init__(self):
61 self._db_handle = None
62 self._url_handle = None
63
64 @property
65 def db_handle(self):
66 if self._db_handle is None:
67 self._db_handle = DB()
68 return self._db_handle
69
70 @property
71 def url_handle(self):
72 if self._url_handle is None:
73 self._url_handle = UrlCaller()
74 return self._url_handle
75
76
77class ExtractInputRequest(task.Task):
78 def __init__(self, resources):
79 super().__init__(provides="parsed_request")
80 self._resources = resources
81
82 def execute(self, request):
83 return {
84 'user': request.user,
85 'user_id': misc.as_int(request.id),
86 'request_id': uuidutils.generate_uuid(),
87 }
88
89
90class MakeDBEntry(task.Task):
91 def __init__(self, resources):
92 super().__init__()
93 self._resources = resources
94
95 def execute(self, parsed_request):
96 db_handle = self._resources.db_handle
97 db_handle.query("INSERT %s INTO mydb" % (parsed_request))
98
99 def revert(self, result, parsed_request):
100 db_handle = self._resources.db_handle
101 db_handle.query("DELETE %s FROM mydb IF EXISTS" % (parsed_request))
102
103
104class ActivateDriver(task.Task):
105 def __init__(self, resources):
106 super().__init__(provides='sent_to')
107 self._resources = resources
108 self._url = "http://blahblah.com"
109
110 def execute(self, parsed_request):
111 print("Sending billing data to %s" % (self._url))
112 url_sender = self._resources.url_handle
113 # Note that here we attach our update_progress function (which is a
114 # function that the engine also 'binds' to) to the progress function
115 # that the url sending helper class uses. This allows the task progress
116 # to be tied to the url sending progress, which is very useful for
117 # downstream systems to be aware of what a task is doing at any time.
118 url_sender.send(
119 self._url,
120 json.dumps(parsed_request),
121 status_cb=self.update_progress,
122 )
123 return self._url
124
125 def update_progress(self, progress, **kwargs):
126 # Override the parent method to also print out the status.
127 super().update_progress(progress, **kwargs)
128 print(f"{self.name} is {progress * 100:0.2f}% done")
129
130
131class DeclareSuccess(task.Task):
132 def execute(self, sent_to):
133 print("Done!")
134 print("All data processed and sent to %s" % (sent_to))
135
136
137class DummyUser:
138 def __init__(self, user, id_):
139 self.user = user
140 self.id = id_
141
142
143# Resources (db handles and similar) of course can *not* be persisted so we
144# need to make sure that we pass this resource fetcher to the tasks constructor
145# so that the tasks have access to any needed resources (the resources are
146# lazily loaded so that they are only created when they are used).
147resources = ResourceFetcher()
148flow = lf.Flow("initialize-me")
149
150# 1. First we extract the api request into a usable format.
151# 2. Then we go ahead and make a database entry for our request.
152flow.add(ExtractInputRequest(resources), MakeDBEntry(resources))
153
154# 3. Then we activate our payment method and finally declare success.
155sub_flow = gf.Flow("after-initialize")
156sub_flow.add(ActivateDriver(resources), DeclareSuccess())
157flow.add(sub_flow)
158
159# Initially populate the storage with the following request object,
160# prepopulating this allows the tasks that dependent on the 'request' variable
161# to start processing (in this case this is the ExtractInputRequest task).
162store = {
163 'request': DummyUser(user="bob", id_="1.35"),
164}
165eng = engines.load(flow, engine='serial', store=store)
166
167# This context manager automatically adds (and automatically removes) a
168# helpful set of state transition notification printing helper utilities
169# that show you exactly what transitions the engine is going through
170# while running the various billing related tasks.
171with printing.PrintingListener(eng):
172 eng.run()
Suspending a workflow & resuming¶
Note
Full source located at resume_from_backend
1import logging
2import os
3import sys
4
5logging.basicConfig(level=logging.ERROR)
6
7self_dir = os.path.abspath(os.path.dirname(__file__))
8top_dir = os.path.abspath(
9 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
10)
11sys.path.insert(0, top_dir)
12sys.path.insert(0, self_dir)
13
14from oslo_utils import uuidutils
15
16import taskflow.engines
17from taskflow.patterns import linear_flow as lf
18from taskflow.persistence import models
19from taskflow import task
20
21import example_utils as eu # noqa
22
23# INTRO: In this example linear_flow is used to group three tasks, one which
24# will suspend the future work the engine may do. This suspend engine is then
25# discarded and the workflow is reloaded from the persisted data and then the
26# workflow is resumed from where it was suspended. This allows you to see how
27# to start an engine, have a task stop the engine from doing future work (if
28# a multi-threaded engine is being used, then the currently active work is not
29# preempted) and then resume the work later.
30#
31# Usage:
32#
33# With a filesystem directory as backend
34#
35# python taskflow/examples/resume_from_backend.py
36#
37# With ZooKeeper as backend
38#
39# python taskflow/examples/resume_from_backend.py \
40# zookeeper://127.0.0.1:2181/taskflow/resume_from_backend/
41
42
43# UTILITY FUNCTIONS #########################################
44
45
46def print_task_states(flowdetail, msg):
47 eu.print_wrapped(msg)
48 print(f"Flow '{flowdetail.name}' state: {flowdetail.state}")
49 # Sort by these so that our test validation doesn't get confused by the
50 # order in which the items in the flow detail can be in.
51 items = sorted(
52 (td.name, td.version, td.state, td.results) for td in flowdetail
53 )
54 for item in items:
55 print(" %s==%s: %s, result=%s" % item)
56
57
58def find_flow_detail(backend, lb_id, fd_id):
59 conn = backend.get_connection()
60 lb = conn.get_logbook(lb_id)
61 return lb.find(fd_id)
62
63
64# CREATE FLOW ###############################################
65
66
67class InterruptTask(task.Task):
68 def execute(self):
69 # DO NOT TRY THIS AT HOME
70 engine.suspend()
71
72
73class TestTask(task.Task):
74 def execute(self):
75 print('executing %s' % self)
76 return 'ok'
77
78
79def flow_factory():
80 return lf.Flow('resume from backend example').add(
81 TestTask(name='first'),
82 InterruptTask(name='boom'),
83 TestTask(name='second'),
84 )
85
86
87# INITIALIZE PERSISTENCE ####################################
88
89with eu.get_backend() as backend:
90 # Create a place where the persistence information will be stored.
91 book = models.LogBook("example")
92 flow_detail = models.FlowDetail(
93 "resume from backend example", uuid=uuidutils.generate_uuid()
94 )
95 book.add(flow_detail)
96 with contextlib.closing(backend.get_connection()) as conn:
97 conn.save_logbook(book)
98
99 # CREATE AND RUN THE FLOW: FIRST ATTEMPT ####################
100
101 flow = flow_factory()
102 engine = taskflow.engines.load(
103 flow, flow_detail=flow_detail, book=book, backend=backend
104 )
105
106 print_task_states(flow_detail, "At the beginning, there is no state")
107 eu.print_wrapped("Running")
108 engine.run()
109 print_task_states(flow_detail, "After running")
110
111 # RE-CREATE, RESUME, RUN ####################################
112
113 eu.print_wrapped("Resuming and running again")
114
115 # NOTE(harlowja): reload the flow detail from backend, this will allow us
116 # to resume the flow from its suspended state, but first we need to search
117 # for the right flow details in the correct logbook where things are
118 # stored.
119 #
120 # We could avoid re-loading the engine and just do engine.run() again, but
121 # this example shows how another process may unsuspend a given flow and
122 # start it again for situations where this is useful to-do (say the process
123 # running the above flow crashes).
124 flow2 = flow_factory()
125 flow_detail_2 = find_flow_detail(backend, book.uuid, flow_detail.uuid)
126 engine2 = taskflow.engines.load(
127 flow2, flow_detail=flow_detail_2, backend=backend, book=book
128 )
129 engine2.run()
130 print_task_states(flow_detail_2, "At the end")
Creating a virtual machine (resumable)¶
Note
Full source located at resume_vm_boot
1import hashlib
2import logging
3import os
4import random
5import sys
6import time
7
8logging.basicConfig(level=logging.ERROR)
9
10self_dir = os.path.abspath(os.path.dirname(__file__))
11top_dir = os.path.abspath(
12 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
13)
14sys.path.insert(0, top_dir)
15sys.path.insert(0, self_dir)
16
17import futurist
18from oslo_utils import uuidutils
19
20from taskflow import engines
21from taskflow import exceptions as exc
22from taskflow.patterns import graph_flow as gf
23from taskflow.patterns import linear_flow as lf
24from taskflow.persistence import models
25from taskflow import task
26
27import example_utils as eu # noqa
28
29# INTRO: These examples show how a hierarchy of flows can be used to create a
30# vm in a reliable & resumable manner using taskflow + a miniature version of
31# what nova does while booting a vm.
32
33
34@contextlib.contextmanager
35def slow_down(how_long=0.5):
36 try:
37 yield how_long
38 finally:
39 if len(sys.argv) > 1:
40 # Only both to do this if user input provided.
41 print("** Ctrl-c me please!!! **")
42 time.sleep(how_long)
43
44
45class PrintText(task.Task):
46 """Just inserts some text print outs in a workflow."""
47
48 def __init__(self, print_what, no_slow=False):
49 content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
50 super().__init__(name="Print: %s" % (content_hash))
51 self._text = print_what
52 self._no_slow = no_slow
53
54 def execute(self):
55 if self._no_slow:
56 eu.print_wrapped(self._text)
57 else:
58 with slow_down():
59 eu.print_wrapped(self._text)
60
61
62class DefineVMSpec(task.Task):
63 """Defines a vm specification to be."""
64
65 def __init__(self, name):
66 super().__init__(provides='vm_spec', name=name)
67
68 def execute(self):
69 return {
70 'type': 'kvm',
71 'disks': 2,
72 'vcpu': 1,
73 'ips': 1,
74 'volumes': 3,
75 }
76
77
78class LocateImages(task.Task):
79 """Locates where the vm images are."""
80
81 def __init__(self, name):
82 super().__init__(provides='image_locations', name=name)
83
84 def execute(self, vm_spec):
85 image_locations = {}
86 for i in range(0, vm_spec['disks']):
87 url = "http://www.yahoo.com/images/%s" % (i)
88 image_locations[url] = "/tmp/%s.img" % (i)
89 return image_locations
90
91
92class DownloadImages(task.Task):
93 """Downloads all the vm images."""
94
95 def __init__(self, name):
96 super().__init__(provides='download_paths', name=name)
97
98 def execute(self, image_locations):
99 for src, loc in image_locations.items():
100 with slow_down(1):
101 print(f"Downloading from {src} => {loc}")
102 return sorted(image_locations.values())
103
104
105class CreateNetworkTpl(task.Task):
106 """Generates the network settings file to be placed in the images."""
107
108 SYSCONFIG_CONTENTS = """DEVICE=eth%s
109BOOTPROTO=static
110IPADDR=%s
111ONBOOT=yes"""
112
113 def __init__(self, name):
114 super().__init__(provides='network_settings', name=name)
115
116 def execute(self, ips):
117 settings = []
118 for i, ip in enumerate(ips):
119 settings.append(self.SYSCONFIG_CONTENTS % (i, ip))
120 return settings
121
122
123class AllocateIP(task.Task):
124 """Allocates the ips for the given vm."""
125
126 def __init__(self, name):
127 super().__init__(provides='ips', name=name)
128
129 def execute(self, vm_spec):
130 ips = []
131 for _i in range(0, vm_spec.get('ips', 0)):
132 ips.append("192.168.0.%s" % (random.randint(1, 254)))
133 return ips
134
135
136class WriteNetworkSettings(task.Task):
137 """Writes all the network settings into the downloaded images."""
138
139 def execute(self, download_paths, network_settings):
140 for j, path in enumerate(download_paths):
141 with slow_down(1):
142 print(f"Mounting {path} to /tmp/{j}")
143 for i, setting in enumerate(network_settings):
144 filename = "/tmp/etc/sysconfig/network-scripts/ifcfg-eth%s" % (
145 i
146 )
147 with slow_down(1):
148 print("Writing to %s" % (filename))
149 print(setting)
150
151
152class BootVM(task.Task):
153 """Fires off the vm boot operation."""
154
155 def execute(self, vm_spec):
156 print("Starting vm!")
157 with slow_down(1):
158 print("Created: %s" % (vm_spec))
159
160
161class AllocateVolumes(task.Task):
162 """Allocates the volumes for the vm."""
163
164 def execute(self, vm_spec):
165 volumes = []
166 for i in range(0, vm_spec['volumes']):
167 with slow_down(1):
168 volumes.append("/dev/vda%s" % (i + 1))
169 print("Allocated volume %s" % volumes[-1])
170 return volumes
171
172
173class FormatVolumes(task.Task):
174 """Formats the volumes for the vm."""
175
176 def execute(self, volumes):
177 for v in volumes:
178 print("Formatting volume %s" % v)
179 with slow_down(1):
180 pass
181 print("Formatted volume %s" % v)
182
183
184def create_flow():
185 # Setup the set of things to do (mini-nova).
186 flow = lf.Flow("root").add(
187 PrintText("Starting vm creation.", no_slow=True),
188 lf.Flow('vm-maker').add(
189 # First create a specification for the final vm to-be.
190 DefineVMSpec("define_spec"),
191 # This does all the image stuff.
192 gf.Flow("img-maker").add(
193 LocateImages("locate_images"),
194 DownloadImages("download_images"),
195 ),
196 # This does all the network stuff.
197 gf.Flow("net-maker").add(
198 AllocateIP("get_my_ips"),
199 CreateNetworkTpl("fetch_net_settings"),
200 WriteNetworkSettings("write_net_settings"),
201 ),
202 # This does all the volume stuff.
203 gf.Flow("volume-maker").add(
204 AllocateVolumes("allocate_my_volumes", provides='volumes'),
205 FormatVolumes("volume_formatter"),
206 ),
207 # Finally boot it all.
208 BootVM("boot-it"),
209 ),
210 # Ya it worked!
211 PrintText("Finished vm create.", no_slow=True),
212 PrintText("Instance is running!", no_slow=True),
213 )
214 return flow
215
216
217eu.print_wrapped("Initializing")
218
219# Setup the persistence & resumption layer.
220with eu.get_backend() as backend:
221 # Try to find a previously passed in tracking id...
222 try:
223 book_id, flow_id = sys.argv[2].split("+", 1)
224 if not uuidutils.is_uuid_like(book_id):
225 book_id = None
226 if not uuidutils.is_uuid_like(flow_id):
227 flow_id = None
228 except (IndexError, ValueError):
229 book_id = None
230 flow_id = None
231
232 # Set up how we want our engine to run, serial, parallel...
233 try:
234 executor = futurist.GreenThreadPoolExecutor(max_workers=5)
235 except RuntimeError:
236 # No eventlet installed, just let the default be used instead.
237 executor = None
238
239 # Create/fetch a logbook that will track the workflows work.
240 book = None
241 flow_detail = None
242 if all([book_id, flow_id]):
243 # Try to find in a prior logbook and flow detail...
244 with contextlib.closing(backend.get_connection()) as conn:
245 try:
246 book = conn.get_logbook(book_id)
247 flow_detail = book.find(flow_id)
248 except exc.NotFound:
249 pass
250 if book is None and flow_detail is None:
251 book = models.LogBook("vm-boot")
252 with contextlib.closing(backend.get_connection()) as conn:
253 conn.save_logbook(book)
254 engine = engines.load_from_factory(
255 create_flow,
256 backend=backend,
257 book=book,
258 engine='parallel',
259 executor=executor,
260 )
261 print(
262 "!! Your tracking id is: '{}+{}'".format(
263 book.uuid, engine.storage.flow_uuid
264 )
265 )
266 print("!! Please submit this on later runs for tracking purposes")
267 else:
268 # Attempt to load from a previously partially completed flow.
269 engine = engines.load_from_detail(
270 flow_detail, backend=backend, engine='parallel', executor=executor
271 )
272
273 # Make me my vm please!
274 eu.print_wrapped('Running')
275 engine.run()
276
277# How to use.
278#
279# 1. $ python me.py "sqlite:////tmp/nova.db"
280# 2. ctrl-c before this finishes
281# 3. Find the tracking id (search for 'Your tracking id is')
282# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
283# 5. Watch it pick up where it left off.
284# 6. Profit!
Creating a volume (resumable)¶
Note
Full source located at resume_volume_create
1import hashlib
2import logging
3import os
4import random
5import sys
6import time
7
8logging.basicConfig(level=logging.ERROR)
9
10self_dir = os.path.abspath(os.path.dirname(__file__))
11top_dir = os.path.abspath(
12 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
13)
14sys.path.insert(0, top_dir)
15sys.path.insert(0, self_dir)
16
17from oslo_utils import uuidutils
18
19from taskflow import engines
20from taskflow.patterns import graph_flow as gf
21from taskflow.patterns import linear_flow as lf
22from taskflow.persistence import models
23from taskflow import task
24
25import example_utils # noqa
26
27# INTRO: These examples show how a hierarchy of flows can be used to create a
28# pseudo-volume in a reliable & resumable manner using taskflow + a miniature
29# version of what cinder does while creating a volume (very miniature).
30
31
32@contextlib.contextmanager
33def slow_down(how_long=0.5):
34 try:
35 yield how_long
36 finally:
37 print("** Ctrl-c me please!!! **")
38 time.sleep(how_long)
39
40
41def find_flow_detail(backend, book_id, flow_id):
42 # NOTE(harlowja): this is used to attempt to find a given logbook with
43 # a given id and a given flow details inside that logbook, we need this
44 # reference so that we can resume the correct flow (as a logbook tracks
45 # flows and a flow detail tracks a individual flow).
46 #
47 # Without a reference to the logbook and the flow details in that logbook
48 # we will not know exactly what we should resume and that would mean we
49 # can't resume what we don't know.
50 with contextlib.closing(backend.get_connection()) as conn:
51 lb = conn.get_logbook(book_id)
52 return lb.find(flow_id)
53
54
55class PrintText(task.Task):
56 def __init__(self, print_what, no_slow=False):
57 content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
58 super().__init__(name="Print: %s" % (content_hash))
59 self._text = print_what
60 self._no_slow = no_slow
61
62 def execute(self):
63 if self._no_slow:
64 print("-" * (len(self._text)))
65 print(self._text)
66 print("-" * (len(self._text)))
67 else:
68 with slow_down():
69 print("-" * (len(self._text)))
70 print(self._text)
71 print("-" * (len(self._text)))
72
73
74class CreateSpecForVolumes(task.Task):
75 def execute(self):
76 volumes = []
77 for i in range(0, random.randint(1, 10)):
78 volumes.append(
79 {
80 'type': 'disk',
81 'location': "/dev/vda%s" % (i + 1),
82 }
83 )
84 return volumes
85
86
87class PrepareVolumes(task.Task):
88 def execute(self, volume_specs):
89 for v in volume_specs:
90 with slow_down():
91 print("Dusting off your hard drive %s" % (v))
92 with slow_down():
93 print("Taking a well deserved break.")
94 print("Your drive %s has been certified." % (v))
95
96
97# Setup the set of things to do (mini-cinder).
98flow = lf.Flow("root").add(
99 PrintText("Starting volume create", no_slow=True),
100 gf.Flow('maker').add(
101 CreateSpecForVolumes("volume_specs", provides='volume_specs'),
102 PrintText("I need a nap, it took me a while to build those specs."),
103 PrepareVolumes(),
104 ),
105 PrintText("Finished volume create", no_slow=True),
106)
107
108# Setup the persistence & resumption layer.
109with example_utils.get_backend() as backend:
110 try:
111 book_id, flow_id = sys.argv[2].split("+", 1)
112 except (IndexError, ValueError):
113 book_id = None
114 flow_id = None
115
116 if not all([book_id, flow_id]):
117 # If no 'tracking id' (think a fedex or ups tracking id) is provided
118 # then we create one by creating a logbook (where flow details are
119 # stored) and creating a flow detail (where flow and task state is
120 # stored). The combination of these 2 objects unique ids (uuids) allows
121 # the users of taskflow to reassociate the workflows that were
122 # potentially running (and which may have partially completed) back
123 # with taskflow so that those workflows can be resumed (or reverted)
124 # after a process/thread/engine has failed in someway.
125 book = models.LogBook('resume-volume-create')
126 flow_detail = models.FlowDetail("root", uuid=uuidutils.generate_uuid())
127 book.add(flow_detail)
128 with contextlib.closing(backend.get_connection()) as conn:
129 conn.save_logbook(book)
130 print(
131 "!! Your tracking id is: '{}+{}'".format(
132 book.uuid, flow_detail.uuid
133 )
134 )
135 print("!! Please submit this on later runs for tracking purposes")
136 else:
137 flow_detail = find_flow_detail(backend, book_id, flow_id)
138
139 # Load and run.
140 engine = engines.load(
141 flow, flow_detail=flow_detail, backend=backend, engine='serial'
142 )
143 engine.run()
144
145# How to use.
146#
147# 1. $ python me.py "sqlite:////tmp/cinder.db"
148# 2. ctrl-c before this finishes
149# 3. Find the tracking id (search for 'Your tracking id is')
150# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
151# 5. Profit!
Running engines via iteration¶
Note
Full source located at run_by_iter
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6self_dir = os.path.abspath(os.path.dirname(__file__))
7top_dir = os.path.abspath(
8 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
9)
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13
14from taskflow import engines
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17
18
19# INTRO: This example shows how to run a set of engines at the same time, each
20# running in different engines using a single thread of control to iterate over
21# each engine (which causes that engine to advanced to its next state during
22# each iteration).
23
24
25class EchoTask(task.Task):
26 def execute(self, value):
27 print(value)
28 return chr(ord(value) + 1)
29
30
31def make_alphabet_flow(i):
32 f = lf.Flow("alphabet_%s" % (i))
33 start_value = 'A'
34 end_value = 'Z'
35 curr_value = start_value
36 while ord(curr_value) <= ord(end_value):
37 next_value = chr(ord(curr_value) + 1)
38 if curr_value != end_value:
39 f.add(
40 EchoTask(
41 name="echoer_%s" % curr_value,
42 rebind={'value': curr_value},
43 provides=next_value,
44 )
45 )
46 else:
47 f.add(
48 EchoTask(
49 name="echoer_%s" % curr_value, rebind={'value': curr_value}
50 )
51 )
52 curr_value = next_value
53 return f
54
55
56# Adjust this number to change how many engines/flows run at once.
57flow_count = 1
58flows = []
59for i in range(0, flow_count):
60 f = make_alphabet_flow(i + 1)
61 flows.append(make_alphabet_flow(i + 1))
62engine_iters = []
63for f in flows:
64 e = engines.load(f)
65 e.compile()
66 e.storage.inject({'A': 'A'})
67 e.prepare()
68 engine_iters.append(e.run_iter())
69while engine_iters:
70 for it in list(engine_iters):
71 try:
72 print(next(it))
73 except StopIteration:
74 engine_iters.remove(it)
Controlling retries using a retry controller¶
Note
Full source located at retry_flow
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(
7 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
8)
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import retry
14from taskflow import task
15
16# INTRO: In this example we create a retry controller that receives a phone
17# directory and tries different phone numbers. The next task tries to call Jim
18# using the given number. If it is not a Jim's number, the task raises an
19# exception and retry controller takes the next number from the phone
20# directory and retries the call.
21#
22# This example shows a basic usage of retry controllers in a flow.
23# Retry controllers allows to revert and retry a failed subflow with new
24# parameters.
25
26
27class CallJim(task.Task):
28 def execute(self, jim_number):
29 print("Calling jim %s." % jim_number)
30 if jim_number != 555:
31 raise Exception("Wrong number!")
32 else:
33 print("Hello Jim!")
34
35 def revert(self, jim_number, **kwargs):
36 print("Wrong number, apologizing.")
37
38
39# Create your flow and associated tasks (the work to be done).
40flow = lf.Flow(
41 'retrying-linear',
42 retry=retry.ParameterizedForEach(
43 rebind=['phone_directory'], provides='jim_number'
44 ),
45).add(CallJim())
46
47# Now run that flow using the provided initial data (store below).
48taskflow.engines.run(flow, store={'phone_directory': [333, 444, 555, 666]})
Distributed execution (simple)¶
Note
Full source located at wbe_simple_linear
1import logging
2import os
3import sys
4import tempfile
5
6top_dir = os.path.abspath(
7 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
8)
9sys.path.insert(0, top_dir)
10
11from taskflow import engines
12from taskflow.engines.worker_based import worker
13from taskflow.patterns import linear_flow as lf
14from taskflow.tests import utils
15from taskflow.utils import threading_utils
16
17import example_utils # noqa
18
19# INTRO: This example walks through a miniature workflow which shows how to
20# start up a number of workers (these workers will process task execution and
21# reversion requests using any provided input data) and then use an engine
22# that creates a set of *capable* tasks and flows (the engine can not create
23# tasks that the workers are not able to run, this will end in failure) that
24# those workers will run and then executes that workflow seamlessly using the
25# workers to perform the actual execution.
26#
27# NOTE(harlowja): this example simulates the expected larger number of workers
28# by using a set of threads (which in this example simulate the remote workers
29# that would typically be running on other external machines).
30
31# A filesystem can also be used as the queue transport (useful as simple
32# transport type that does not involve setting up a larger mq system). If this
33# is false then the memory transport is used instead, both work in standalone
34# setups.
35USE_FILESYSTEM = False
36BASE_SHARED_CONF = {
37 'exchange': 'taskflow',
38}
39
40# Until https://github.com/celery/kombu/issues/398 is resolved it is not
41# recommended to run many worker threads in this example due to the types
42# of errors mentioned in that issue.
43MEMORY_WORKERS = 2
44FILE_WORKERS = 1
45WORKER_CONF = {
46 # These are the tasks the worker can execute, they *must* be importable,
47 # typically this list is used to restrict what workers may execute to
48 # a smaller set of *allowed* tasks that are known to be safe (one would
49 # not want to allow all python code to be executed).
50 'tasks': [
51 'taskflow.tests.utils:TaskOneArgOneReturn',
52 'taskflow.tests.utils:TaskMultiArgOneReturn',
53 ],
54}
55
56
57def run(engine_options):
58 flow = lf.Flow('simple-linear').add(
59 utils.TaskOneArgOneReturn(provides='result1'),
60 utils.TaskMultiArgOneReturn(provides='result2'),
61 )
62 eng = engines.load(
63 flow,
64 store=dict(x=111, y=222, z=333),
65 engine='worker-based',
66 **engine_options,
67 )
68 eng.run()
69 return eng.storage.fetch_all()
70
71
72if __name__ == "__main__":
73 logging.basicConfig(level=logging.ERROR)
74
75 # Setup our transport configuration and merge it into the worker and
76 # engine configuration so that both of those use it correctly.
77 shared_conf = dict(BASE_SHARED_CONF)
78
79 tmp_path = None
80 if USE_FILESYSTEM:
81 worker_count = FILE_WORKERS
82 tmp_path = tempfile.mkdtemp(prefix='wbe-example-')
83 shared_conf.update(
84 {
85 'transport': 'filesystem',
86 'transport_options': {
87 'data_folder_in': tmp_path,
88 'data_folder_out': tmp_path,
89 'polling_interval': 0.1,
90 },
91 }
92 )
93 else:
94 worker_count = MEMORY_WORKERS
95 shared_conf.update(
96 {
97 'transport': 'memory',
98 'transport_options': {
99 'polling_interval': 0.1,
100 },
101 }
102 )
103 worker_conf = dict(WORKER_CONF)
104 worker_conf.update(shared_conf)
105 engine_options = dict(shared_conf)
106 workers = []
107 worker_topics = []
108
109 try:
110 # Create a set of workers to simulate actual remote workers.
111 print('Running %s workers.' % (worker_count))
112 for i in range(0, worker_count):
113 worker_conf['topic'] = 'worker-%s' % (i + 1)
114 worker_topics.append(worker_conf['topic'])
115 w = worker.Worker(**worker_conf)
116 runner = threading_utils.daemon_thread(w.run)
117 runner.start()
118 w.wait()
119 workers.append((runner, w.stop))
120
121 # Now use those workers to do something.
122 print('Executing some work.')
123 engine_options['topics'] = worker_topics
124 result = run(engine_options)
125 print('Execution finished.')
126 # This is done so that the test examples can work correctly
127 # even when the keys change order (which will happen in various
128 # python versions).
129 print("Result = %s" % json.dumps(result, sort_keys=True))
130 finally:
131 # And cleanup.
132 print('Stopping workers.')
133 while workers:
134 r, stopper = workers.pop()
135 stopper()
136 r.join()
137 if tmp_path:
138 example_utils.rm_path(tmp_path)
Distributed notification (simple)¶
Note
Full source located at wbe_event_sender
1import os
2import string
3import sys
4import time
5
6top_dir = os.path.abspath(
7 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
8)
9sys.path.insert(0, top_dir)
10
11from taskflow import engines
12from taskflow.engines.worker_based import worker
13from taskflow.patterns import linear_flow as lf
14from taskflow import task
15from taskflow.types import notifier
16from taskflow.utils import threading_utils
17
18ANY = notifier.Notifier.ANY
19
20# INTRO: These examples show how to use a remote worker's event notification
21# attribute to proxy back task event notifications to the controlling process.
22#
23# In this case a simple set of events is triggered by a worker running a
24# task (simulated to be remote by using a kombu memory transport and threads).
25# Those events that the 'remote worker' produces will then be proxied back to
26# the task that the engine is running 'remotely', and then they will be emitted
27# back to the original callbacks that exist in the originating engine
28# process/thread. This creates a one-way *notification* channel that can
29# transparently be used in-process, outside-of-process using remote workers and
30# so-on that allows tasks to signal to its controlling process some sort of
31# action that has occurred that the task may need to tell others about (for
32# example to trigger some type of response when the task reaches 50% done...).
33
34
35def event_receiver(event_type, details):
36 """This is the callback that (in this example) doesn't do much..."""
37 print("Recieved event '%s'" % event_type)
38 print("Details = %s" % details)
39
40
41class EventReporter(task.Task):
42 """This is the task that will be running 'remotely' (not really remote)."""
43
44 EVENTS = tuple(string.ascii_uppercase)
45 EVENT_DELAY = 0.1
46
47 def execute(self):
48 for i, e in enumerate(self.EVENTS):
49 details = {
50 'leftover': self.EVENTS[i:],
51 }
52 self.notifier.notify(e, details)
53 time.sleep(self.EVENT_DELAY)
54
55
56BASE_SHARED_CONF = {
57 'exchange': 'taskflow',
58 'transport': 'memory',
59 'transport_options': {
60 'polling_interval': 0.1,
61 },
62}
63
64# Until https://github.com/celery/kombu/issues/398 is resolved it is not
65# recommended to run many worker threads in this example due to the types
66# of errors mentioned in that issue.
67MEMORY_WORKERS = 1
68WORKER_CONF = {
69 'tasks': [
70 # Used to locate which tasks we can run (we don't want to allow
71 # arbitrary code/tasks to be ran by any worker since that would
72 # open up a variety of vulnerabilities).
73 '%s:EventReporter' % (__name__),
74 ],
75}
76
77
78def run(engine_options):
79 reporter = EventReporter()
80 reporter.notifier.register(ANY, event_receiver)
81 flow = lf.Flow('event-reporter').add(reporter)
82 eng = engines.load(flow, engine='worker-based', **engine_options)
83 eng.run()
84
85
86if __name__ == "__main__":
87 logging.basicConfig(level=logging.ERROR)
88
89 # Setup our transport configuration and merge it into the worker and
90 # engine configuration so that both of those objects use it correctly.
91 worker_conf = dict(WORKER_CONF)
92 worker_conf.update(BASE_SHARED_CONF)
93 engine_options = dict(BASE_SHARED_CONF)
94 workers = []
95
96 # These topics will be used to request worker information on; those
97 # workers will respond with their capabilities which the executing engine
98 # will use to match pending tasks to a matched worker, this will cause
99 # the task to be sent for execution, and the engine will wait until it
100 # is finished (a response is received) and then the engine will either
101 # continue with other tasks, do some retry/failure resolution logic or
102 # stop (and potentially re-raise the remote workers failure)...
103 worker_topics = []
104
105 try:
106 # Create a set of worker threads to simulate actual remote workers...
107 print('Running %s workers.' % (MEMORY_WORKERS))
108 for i in range(0, MEMORY_WORKERS):
109 # Give each one its own unique topic name so that they can
110 # correctly communicate with the engine (they will all share the
111 # same exchange).
112 worker_conf['topic'] = 'worker-%s' % (i + 1)
113 worker_topics.append(worker_conf['topic'])
114 w = worker.Worker(**worker_conf)
115 runner = threading_utils.daemon_thread(w.run)
116 runner.start()
117 w.wait()
118 workers.append((runner, w.stop))
119
120 # Now use those workers to do something.
121 print('Executing some work.')
122 engine_options['topics'] = worker_topics
123 result = run(engine_options)
124 print('Execution finished.')
125 finally:
126 # And cleanup.
127 print('Stopping workers.')
128 while workers:
129 r, stopper = workers.pop()
130 stopper()
131 r.join()
Distributed mandelbrot (complex)¶
Note
Full source located at wbe_mandelbrot
Output¶
Code¶
1import math
2import os
3import sys
4
5top_dir = os.path.abspath(
6 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
7)
8sys.path.insert(0, top_dir)
9
10from taskflow import engines
11from taskflow.engines.worker_based import worker
12from taskflow.patterns import unordered_flow as uf
13from taskflow import task
14from taskflow.utils import threading_utils
15
16# INTRO: This example walks through a workflow that will in parallel compute
17# a mandelbrot result set (using X 'remote' workers) and then combine their
18# results together to form a final mandelbrot fractal image. It shows a usage
19# of taskflow to perform a well-known embarrassingly parallel problem that has
20# the added benefit of also being an elegant visualization.
21#
22# NOTE(harlowja): this example simulates the expected larger number of workers
23# by using a set of threads (which in this example simulate the remote workers
24# that would typically be running on other external machines).
25#
26# NOTE(harlowja): to have it produce an image run (after installing pillow):
27#
28# $ python taskflow/examples/wbe_mandelbrot.py output.png
29
30BASE_SHARED_CONF = {
31 'exchange': 'taskflow',
32}
33WORKERS = 2
34WORKER_CONF = {
35 # These are the tasks the worker can execute, they *must* be importable,
36 # typically this list is used to restrict what workers may execute to
37 # a smaller set of *allowed* tasks that are known to be safe (one would
38 # not want to allow all python code to be executed).
39 'tasks': [
40 '%s:MandelCalculator' % (__name__),
41 ],
42}
43ENGINE_CONF = {
44 'engine': 'worker-based',
45}
46
47# Mandelbrot & image settings...
48IMAGE_SIZE = (512, 512)
49CHUNK_COUNT = 8
50MAX_ITERATIONS = 25
51
52
53class MandelCalculator(task.Task):
54 def execute(self, image_config, mandelbrot_config, chunk):
55 """Returns the number of iterations before the computation "escapes".
56
57 Given the real and imaginary parts of a complex number, determine if it
58 is a candidate for membership in the mandelbrot set given a fixed
59 number of iterations.
60 """
61
62 # Parts borrowed from (credit to mark harris and benoît mandelbrot).
63 #
64 # http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43
65 def mandelbrot(x, y, max_iters):
66 c = complex(x, y)
67 z = 0.0j
68 for i in range(max_iters):
69 z = z * z + c
70 if (z.real * z.real + z.imag * z.imag) >= 4:
71 return i
72 return max_iters
73
74 min_x, max_x, min_y, max_y, max_iters = mandelbrot_config
75 height, width = image_config['size']
76 pixel_size_x = (max_x - min_x) / width
77 pixel_size_y = (max_y - min_y) / height
78 block = []
79 for y in range(chunk[0], chunk[1]):
80 row = []
81 imag = min_y + y * pixel_size_y
82 for x in range(0, width):
83 real = min_x + x * pixel_size_x
84 row.append(mandelbrot(real, imag, max_iters))
85 block.append(row)
86 return block
87
88
89def calculate(engine_conf):
90 # Subdivide the work into X pieces, then request each worker to calculate
91 # one of those chunks and then later we will write these chunks out to
92 # an image bitmap file.
93
94 # And unordered flow is used here since the mandelbrot calculation is an
95 # example of an embarrassingly parallel computation that we can scatter
96 # across as many workers as possible.
97 flow = uf.Flow("mandelbrot")
98
99 # These symbols will be automatically given to tasks as input to their
100 # execute method, in this case these are constants used in the mandelbrot
101 # calculation.
102 store = {
103 'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS],
104 'image_config': {
105 'size': IMAGE_SIZE,
106 },
107 }
108
109 # We need the task names to be in the right order so that we can extract
110 # the final results in the right order (we don't care about the order when
111 # executing).
112 task_names = []
113
114 # Compose our workflow.
115 height, _width = IMAGE_SIZE
116 chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
117 for i in range(0, CHUNK_COUNT):
118 chunk_name = 'chunk_%s' % i
119 task_name = "calculation_%s" % i
120 # Break the calculation up into chunk size pieces.
121 rows = [i * chunk_size, i * chunk_size + chunk_size]
122 flow.add(
123 MandelCalculator(
124 task_name,
125 # This ensures the storage symbol with name
126 # 'chunk_name' is sent into the tasks local
127 # symbol 'chunk'. This is how we give each
128 # calculator its own correct sequence of rows
129 # to work on.
130 rebind={'chunk': chunk_name},
131 )
132 )
133 store[chunk_name] = rows
134 task_names.append(task_name)
135
136 # Now execute it.
137 eng = engines.load(flow, store=store, engine_conf=engine_conf)
138 eng.run()
139
140 # Gather all the results and order them for further processing.
141 gather = []
142 for name in task_names:
143 gather.extend(eng.storage.get(name))
144 points = []
145 for y, row in enumerate(gather):
146 for x, color in enumerate(row):
147 points.append(((x, y), color))
148 return points
149
150
151def write_image(results, output_filename=None):
152 print(
153 "Gathered %s results that represents a mandelbrot"
154 " image (using %s chunks that are computed jointly"
155 " by %s workers)." % (len(results), CHUNK_COUNT, WORKERS)
156 )
157 if not output_filename:
158 return
159
160 # Pillow (the PIL fork) saves us from writing our own image writer...
161 try:
162 from PIL import Image
163 except ImportError as e:
164 # To currently get this (may change in the future),
165 # $ pip install Pillow
166 raise RuntimeError("Pillow is required to write image files: %s" % e)
167
168 # Limit to 255, find the max and normalize to that...
169 color_max = 0
170 for _point, color in results:
171 color_max = max(color, color_max)
172
173 # Use gray scale since we don't really have other colors.
174 img = Image.new('L', IMAGE_SIZE, "black")
175 pixels = img.load()
176 for (x, y), color in results:
177 if color_max == 0:
178 color = 0
179 else:
180 color = int((float(color) / color_max) * 255.0)
181 pixels[x, y] = color
182 img.save(output_filename)
183
184
185def create_fractal():
186 logging.basicConfig(level=logging.ERROR)
187
188 # Setup our transport configuration and merge it into the worker and
189 # engine configuration so that both of those use it correctly.
190 shared_conf = dict(BASE_SHARED_CONF)
191 shared_conf.update(
192 {
193 'transport': 'memory',
194 'transport_options': {
195 'polling_interval': 0.1,
196 },
197 }
198 )
199
200 if len(sys.argv) >= 2:
201 output_filename = sys.argv[1]
202 else:
203 output_filename = None
204
205 worker_conf = dict(WORKER_CONF)
206 worker_conf.update(shared_conf)
207 engine_conf = dict(ENGINE_CONF)
208 engine_conf.update(shared_conf)
209 workers = []
210 worker_topics = []
211
212 print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE)
213 try:
214 # Create a set of workers to simulate actual remote workers.
215 print('Running %s workers.' % (WORKERS))
216 for i in range(0, WORKERS):
217 worker_conf['topic'] = 'calculator_%s' % (i + 1)
218 worker_topics.append(worker_conf['topic'])
219 w = worker.Worker(**worker_conf)
220 runner = threading_utils.daemon_thread(w.run)
221 runner.start()
222 w.wait()
223 workers.append((runner, w.stop))
224
225 # Now use those workers to do something.
226 engine_conf['topics'] = worker_topics
227 results = calculate(engine_conf)
228 print('Execution finished.')
229 finally:
230 # And cleanup.
231 print('Stopping workers.')
232 while workers:
233 r, stopper = workers.pop()
234 stopper()
235 r.join()
236 print("Writing image...")
237 write_image(results, output_filename=output_filename)
238
239
240if __name__ == "__main__":
241 create_fractal()
Jobboard producer/consumer (simple)¶
Note
Full source located at jobboard_produce_consume_colors
1import contextlib
2import logging
3import os
4import random
5import sys
6import threading
7import time
8
9logging.basicConfig(level=logging.ERROR)
10
11top_dir = os.path.abspath(
12 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
13)
14sys.path.insert(0, top_dir)
15
16from taskflow import exceptions as excp
17from taskflow.jobs import backends
18from taskflow.utils import kazoo_utils
19from taskflow.utils import threading_utils
20
21# In this example we show how a jobboard can be used to post work for other
22# entities to work on. This example creates a set of jobs using one producer
23# thread (typically this would be split across many machines) and then having
24# other worker threads with their own jobboards select work using a given
25# filters [red/blue] and then perform that work (and consuming or abandoning
26# the job after it has been completed or failed).
27
28# Things to note:
29# - No persistence layer is used (or logbook), just the job details are used
30# to determine if a job should be selected by a worker or not.
31# - This example runs in a single process (this is expected to be atypical
32# but this example shows that it can be done if needed, for testing...)
33# - The iterjobs(), claim(), consume()/abandon() worker workflow.
34# - The post() producer workflow.
35
36SHARED_CONF = {
37 'path': "/taskflow/jobs",
38 'board': 'zookeeper',
39}
40
41# How many workers and producers of work will be created (as threads).
42PRODUCERS = 3
43WORKERS = 5
44
45# How many units of work each producer will create.
46PRODUCER_UNITS = 10
47
48# How many units of work are expected to be produced (used so workers can
49# know when to stop running and shutdown, typically this would not be a
50# a value but we have to limit this example's execution time to be less than
51# infinity).
52EXPECTED_UNITS = PRODUCER_UNITS * PRODUCERS
53
54# Delay between producing/consuming more work.
55WORKER_DELAY, PRODUCER_DELAY = (0.5, 0.5)
56
57# To ensure threads don't trample other threads output.
58STDOUT_LOCK = threading.Lock()
59
60
61def dispatch_work(job):
62 # This is where the jobs contained work *would* be done
63 time.sleep(1.0)
64
65
66def safe_print(name, message, prefix=""):
67 with STDOUT_LOCK:
68 if prefix:
69 print(f"{prefix} {name}: {message}")
70 else:
71 print(f"{name}: {message}")
72
73
74def worker(ident, client, consumed):
75 # Create a personal board (using the same client so that it works in
76 # the same process) and start looking for jobs on the board that we want
77 # to perform.
78 name = "W-%s" % (ident)
79 safe_print(name, "started")
80 claimed_jobs = 0
81 consumed_jobs = 0
82 abandoned_jobs = 0
83 with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
84 while len(consumed) != EXPECTED_UNITS:
85 favorite_color = random.choice(['blue', 'red'])
86 for job in board.iterjobs(ensure_fresh=True, only_unclaimed=True):
87 # See if we should even bother with it...
88 if job.details.get('color') != favorite_color:
89 continue
90 safe_print(name, "'%s' [attempting claim]" % (job))
91 try:
92 board.claim(job, name)
93 claimed_jobs += 1
94 safe_print(name, "'%s' [claimed]" % (job))
95 except (excp.NotFound, excp.UnclaimableJob):
96 safe_print(name, "'%s' [claim unsuccessful]" % (job))
97 else:
98 try:
99 dispatch_work(job)
100 board.consume(job, name)
101 safe_print(name, "'%s' [consumed]" % (job))
102 consumed_jobs += 1
103 consumed.append(job)
104 except Exception:
105 board.abandon(job, name)
106 abandoned_jobs += 1
107 safe_print(name, "'%s' [abandoned]" % (job))
108 time.sleep(WORKER_DELAY)
109 safe_print(
110 name,
111 "finished (claimed %s jobs, consumed %s jobs,"
112 " abandoned %s jobs)" % (claimed_jobs, consumed_jobs, abandoned_jobs),
113 prefix=">>>",
114 )
115
116
117def producer(ident, client):
118 # Create a personal board (using the same client so that it works in
119 # the same process) and start posting jobs on the board that we want
120 # some entity to perform.
121 name = "P-%s" % (ident)
122 safe_print(name, "started")
123 with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
124 for i in range(0, PRODUCER_UNITS):
125 job_name = f"{name}-{i}"
126 details = {
127 'color': random.choice(['red', 'blue']),
128 }
129 job = board.post(job_name, book=None, details=details)
130 safe_print(name, "'%s' [posted]" % (job))
131 time.sleep(PRODUCER_DELAY)
132 safe_print(name, "finished", prefix=">>>")
133
134
135def main():
136 # TODO(harlowja): Hack to make eventlet work right, remove when the
137 # following is fixed: https://github.com/eventlet/eventlet/issues/230
138 from taskflow.utils import eventlet_utils as _eu # noqa
139
140 try:
141 import eventlet as _eventlet # noqa
142 except ImportError:
143 pass
144
145 client = kazoo_utils.make_client({})
146 with contextlib.closing(client) as c:
147 created = []
148 for i in range(0, PRODUCERS):
149 p = threading_utils.daemon_thread(producer, i + 1, c)
150 created.append(p)
151 p.start()
152 consumed = collections.deque()
153 for i in range(0, WORKERS):
154 w = threading_utils.daemon_thread(worker, i + 1, c, consumed)
155 created.append(w)
156 w.start()
157 while created:
158 t = created.pop()
159 t.join()
160 # At the end there should be nothing leftover, let's verify that.
161 board = backends.fetch('verifier', SHARED_CONF.copy(), client=c)
162 board.connect()
163 with contextlib.closing(board):
164 if board.job_count != 0 or len(consumed) != EXPECTED_UNITS:
165 return 1
166 return 0
167
168
169if __name__ == "__main__":
170 sys.exit(main())
Conductor simulating a CI pipeline¶
Note
Full source located at tox_conductor
1import itertools
2import logging
3import os
4import shutil
5import socket
6import sys
7import tempfile
8import threading
9import time
10
11logging.basicConfig(level=logging.ERROR)
12
13top_dir = os.path.abspath(
14 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
15)
16sys.path.insert(0, top_dir)
17
18from oslo_utils import timeutils
19from oslo_utils import uuidutils
20
21from taskflow.conductors import backends as conductors
22from taskflow import engines
23from taskflow.jobs import backends as boards
24from taskflow.patterns import linear_flow
25from taskflow.persistence import backends as persistence
26from taskflow.persistence import models
27from taskflow import task
28from taskflow.utils import kazoo_utils
29from taskflow.utils import threading_utils
30
31# INTRO: This examples shows how a worker/producer can post desired work (jobs)
32# to a jobboard and a conductor can consume that work (jobs) from that jobboard
33# and execute those jobs in a reliable & async manner (for example, if the
34# conductor were to crash then the job will be released back onto the jobboard
35# and another conductor can attempt to finish it, from wherever that job last
36# left off).
37#
38# In this example a in-memory jobboard (and in-memory storage) is created and
39# used that simulates how this would be done at a larger scale (it is an
40# example after all).
41
42# Restrict how long this example runs for...
43RUN_TIME = 5
44REVIEW_CREATION_DELAY = 0.5
45SCAN_DELAY = 0.1
46NAME = f"{socket.getfqdn()}_{os.getpid()}"
47
48# This won't really use zookeeper but will use a local version of it using
49# the zake library that mimics an actual zookeeper cluster using threads and
50# an in-memory data structure.
51JOBBOARD_CONF = {
52 'board': 'zookeeper://localhost?path=/taskflow/tox/jobs',
53}
54
55
56class RunReview(task.Task):
57 # A dummy task that clones the review and runs tox...
58
59 def _clone_review(self, review, temp_dir):
60 print("Cloning review '{}' into {}".format(review['id'], temp_dir))
61
62 def _run_tox(self, temp_dir):
63 print("Running tox in %s" % temp_dir)
64
65 def execute(self, review, temp_dir):
66 self._clone_review(review, temp_dir)
67 self._run_tox(temp_dir)
68
69
70class MakeTempDir(task.Task):
71 # A task that creates and destroys a temporary dir (on failure).
72 #
73 # It provides the location of the temporary dir for other tasks to use
74 # as they see fit.
75
76 default_provides = 'temp_dir'
77
78 def execute(self):
79 return tempfile.mkdtemp()
80
81 def revert(self, *args, **kwargs):
82 temp_dir = kwargs.get(task.REVERT_RESULT)
83 if temp_dir:
84 shutil.rmtree(temp_dir)
85
86
87class CleanResources(task.Task):
88 # A task that cleans up any workflow resources.
89
90 def execute(self, temp_dir):
91 print("Removing %s" % temp_dir)
92 shutil.rmtree(temp_dir)
93
94
95def review_iter():
96 """Makes reviews (never-ending iterator/generator)."""
97 review_id_gen = itertools.count(0)
98 while True:
99 review_id = next(review_id_gen)
100 review = {
101 'id': review_id,
102 }
103 yield review
104
105
106# The reason this is at the module namespace level is important, since it must
107# be accessible from a conductor dispatching an engine, if it was a lambda
108# function for example, it would not be reimportable and the conductor would
109# be unable to reference it when creating the workflow to run.
110def create_review_workflow():
111 """Factory method used to create a review workflow to run."""
112 f = linear_flow.Flow("tester")
113 f.add(
114 MakeTempDir(name="maker"),
115 RunReview(name="runner"),
116 CleanResources(name="cleaner"),
117 )
118 return f
119
120
121def generate_reviewer(client, saver, name=NAME):
122 """Creates a review producer thread with the given name prefix."""
123 real_name = "%s_reviewer" % name
124 no_more = threading.Event()
125 jb = boards.fetch(
126 real_name, JOBBOARD_CONF, client=client, persistence=saver
127 )
128
129 def make_save_book(saver, review_id):
130 # Record what we want to happen (sometime in the future).
131 book = models.LogBook("book_%s" % review_id)
132 detail = models.FlowDetail(
133 "flow_%s" % review_id, uuidutils.generate_uuid()
134 )
135 book.add(detail)
136 # Associate the factory method we want to be called (in the future)
137 # with the book, so that the conductor will be able to call into
138 # that factory to retrieve the workflow objects that represent the
139 # work.
140 #
141 # These args and kwargs *can* be used to save any specific parameters
142 # into the factory when it is being called to create the workflow
143 # objects (typically used to tell a factory how to create a unique
144 # workflow that represents this review).
145 factory_args = ()
146 factory_kwargs = {}
147 engines.save_factory_details(
148 detail, create_review_workflow, factory_args, factory_kwargs
149 )
150 with contextlib.closing(saver.get_connection()) as conn:
151 conn.save_logbook(book)
152 return book
153
154 def run():
155 """Periodically publishes 'fake' reviews to analyze."""
156 jb.connect()
157 review_generator = review_iter()
158 with contextlib.closing(jb):
159 while not no_more.is_set():
160 review = next(review_generator)
161 details = {
162 'store': {
163 'review': review,
164 },
165 }
166 job_name = "{}_{}".format(real_name, review['id'])
167 print("Posting review '%s'" % review['id'])
168 jb.post(
169 job_name,
170 book=make_save_book(saver, review['id']),
171 details=details,
172 )
173 time.sleep(REVIEW_CREATION_DELAY)
174
175 # Return the unstarted thread, and a callback that can be used
176 # shutdown that thread (to avoid running forever).
177 return (threading_utils.daemon_thread(target=run), no_more.set)
178
179
180def generate_conductor(client, saver, name=NAME):
181 """Creates a conductor thread with the given name prefix."""
182 real_name = "%s_conductor" % name
183 jb = boards.fetch(name, JOBBOARD_CONF, client=client, persistence=saver)
184 conductor = conductors.fetch(
185 "blocking", real_name, jb, engine='parallel', wait_timeout=SCAN_DELAY
186 )
187
188 def run():
189 jb.connect()
190 with contextlib.closing(jb):
191 conductor.run()
192
193 # Return the unstarted thread, and a callback that can be used
194 # shutdown that thread (to avoid running forever).
195 return (threading_utils.daemon_thread(target=run), conductor.stop)
196
197
198def main():
199 # Need to share the same backend, so that data can be shared...
200 persistence_conf = {
201 'connection': 'memory',
202 }
203 saver = persistence.fetch(persistence_conf)
204 with contextlib.closing(saver.get_connection()) as conn:
205 # This ensures that the needed backend setup/data directories/schema
206 # upgrades and so on... exist before they are attempted to be used...
207 conn.upgrade()
208 fc1 = kazoo_utils.make_client({})
209 fc2 = kazoo_utils.make_client({})
210 entities = [
211 generate_reviewer(fc1, saver),
212 generate_conductor(fc2, saver),
213 ]
214 for t, stopper in entities:
215 t.start()
216 try:
217 watch = timeutils.StopWatch(duration=RUN_TIME)
218 watch.start()
219 while not watch.expired():
220 time.sleep(0.1)
221 finally:
222 for t, stopper in reversed(entities):
223 stopper()
224 t.join()
225
226
227if __name__ == '__main__':
228 main()
Conductor running 99 bottles of beer song requests¶
Note
Full source located at 99_bottles
1import functools
2import logging
3import os
4import sys
5import time
6import traceback
7
8from kazoo import client
9
10top_dir = os.path.abspath(
11 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
12)
13sys.path.insert(0, top_dir)
14
15from taskflow.conductors import backends as conductor_backends
16from taskflow import engines
17from taskflow.jobs import backends as job_backends
18from taskflow import logging as taskflow_logging
19from taskflow.patterns import linear_flow as lf
20from taskflow.persistence import backends as persistence_backends
21from taskflow.persistence import models
22from taskflow import task
23
24from oslo_utils import timeutils
25from oslo_utils import uuidutils
26
27# Instructions!
28#
29# 1. Install zookeeper (or change host listed below)
30# 2. Download this example, place in file '99_bottles.py'
31# 3. Run `python 99_bottles.py p` to place a song request onto the jobboard
32# 4. Run `python 99_bottles.py c` a few times (in different shells)
33# 5. On demand kill previously listed processes created in (4) and watch
34# the work resume on another process (and repeat)
35# 6. Keep enough workers alive to eventually finish the song (if desired).
36
37ME = os.getpid()
38ZK_HOST = "localhost:2181"
39JB_CONF = {
40 'hosts': ZK_HOST,
41 'board': 'zookeeper',
42 'path': '/taskflow/99-bottles-demo',
43}
44PERSISTENCE_URI = r"sqlite:////tmp/bottles.db"
45TAKE_DOWN_DELAY = 1.0
46PASS_AROUND_DELAY = 3.0
47HOW_MANY_BOTTLES = 99
48
49
50class TakeABottleDown(task.Task):
51 def execute(self, bottles_left):
52 sys.stdout.write('Take one down, ')
53 sys.stdout.flush()
54 time.sleep(TAKE_DOWN_DELAY)
55 return bottles_left - 1
56
57
58class PassItAround(task.Task):
59 def execute(self):
60 sys.stdout.write('pass it around, ')
61 sys.stdout.flush()
62 time.sleep(PASS_AROUND_DELAY)
63
64
65class Conclusion(task.Task):
66 def execute(self, bottles_left):
67 sys.stdout.write('%s bottles of beer on the wall...\n' % bottles_left)
68 sys.stdout.flush()
69
70
71def make_bottles(count):
72 # This is the function that will be called to generate the workflow
73 # and will also be called to regenerate it on resumption so that work
74 # can continue from where it last left off...
75
76 s = lf.Flow("bottle-song")
77
78 take_bottle = TakeABottleDown(
79 "take-bottle-%s" % count,
80 inject={'bottles_left': count},
81 provides='bottles_left',
82 )
83 pass_it = PassItAround("pass-%s-around" % count)
84 next_bottles = Conclusion("next-bottles-%s" % (count - 1))
85 s.add(take_bottle, pass_it, next_bottles)
86
87 for bottle in reversed(list(range(1, count))):
88 take_bottle = TakeABottleDown(
89 "take-bottle-%s" % bottle, provides='bottles_left'
90 )
91 pass_it = PassItAround("pass-%s-around" % bottle)
92 next_bottles = Conclusion("next-bottles-%s" % (bottle - 1))
93 s.add(take_bottle, pass_it, next_bottles)
94
95 return s
96
97
98def run_conductor(only_run_once=False):
99 # This continuously runs consumers until its stopped via ctrl-c or other
100 # kill signal...
101 event_watches = {}
102
103 # This will be triggered by the conductor doing various activities
104 # with engines, and is quite nice to be able to see the various timing
105 # segments (which is useful for debugging, or watching, or figuring out
106 # where to optimize).
107 def on_conductor_event(cond, event, details):
108 print("Event '%s' has been received..." % event)
109 print("Details = %s" % details)
110 if event.endswith("_start"):
111 w = timeutils.StopWatch()
112 w.start()
113 base_event = event[0 : -len("_start")]
114 event_watches[base_event] = w
115 if event.endswith("_end"):
116 base_event = event[0 : -len("_end")]
117 try:
118 w = event_watches.pop(base_event)
119 w.stop()
120 print(
121 "It took %0.3f seconds for event '%s' to finish"
122 % (w.elapsed(), base_event)
123 )
124 except KeyError:
125 pass
126 if event == 'running_end' and only_run_once:
127 cond.stop()
128
129 print("Starting conductor with pid: %s" % ME)
130 my_name = "conductor-%s" % ME
131 persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
132 with contextlib.closing(persist_backend):
133 with contextlib.closing(persist_backend.get_connection()) as conn:
134 conn.upgrade()
135 job_backend = job_backends.fetch(
136 my_name, JB_CONF, persistence=persist_backend
137 )
138 job_backend.connect()
139 with contextlib.closing(job_backend):
140 cond = conductor_backends.fetch(
141 'blocking', my_name, job_backend, persistence=persist_backend
142 )
143 on_conductor_event = functools.partial(on_conductor_event, cond)
144 cond.notifier.register(cond.notifier.ANY, on_conductor_event)
145 # Run forever, and kill -9 or ctrl-c me...
146 try:
147 cond.run()
148 finally:
149 cond.stop()
150 cond.wait()
151
152
153def run_poster():
154 # This just posts a single job and then ends...
155 print("Starting poster with pid: %s" % ME)
156 my_name = "poster-%s" % ME
157 persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
158 with contextlib.closing(persist_backend):
159 with contextlib.closing(persist_backend.get_connection()) as conn:
160 conn.upgrade()
161 job_backend = job_backends.fetch(
162 my_name, JB_CONF, persistence=persist_backend
163 )
164 job_backend.connect()
165 with contextlib.closing(job_backend):
166 # Create information in the persistence backend about the
167 # unit of work we want to complete and the factory that
168 # can be called to create the tasks that the work unit needs
169 # to be done.
170 lb = models.LogBook("post-from-%s" % my_name)
171 fd = models.FlowDetail(
172 "song-from-%s" % my_name, uuidutils.generate_uuid()
173 )
174 lb.add(fd)
175 with contextlib.closing(persist_backend.get_connection()) as conn:
176 conn.save_logbook(lb)
177 engines.save_factory_details(
178 fd,
179 make_bottles,
180 [HOW_MANY_BOTTLES],
181 {},
182 backend=persist_backend,
183 )
184 # Post, and be done with it!
185 jb = job_backend.post("song-from-%s" % my_name, book=lb)
186 print("Posted: %s" % jb)
187 print("Goodbye...")
188
189
190def main_local():
191 # Run locally typically this is activating during unit testing when all
192 # the examples are made sure to still function correctly...
193 global TAKE_DOWN_DELAY
194 global PASS_AROUND_DELAY
195 # Make everything go much faster (so that this finishes quickly).
196 PASS_AROUND_DELAY = 0.01
197 TAKE_DOWN_DELAY = 0.01
198 JB_CONF['path'] = JB_CONF['path'] + "-" + uuidutils.generate_uuid()
199 run_poster()
200 run_conductor(only_run_once=True)
201
202
203def check_for_zookeeper(timeout=1):
204 sys.stderr.write("Testing for the existence of a zookeeper server...\n")
205 sys.stderr.write("Please wait....\n")
206 with contextlib.closing(client.KazooClient()) as test_client:
207 try:
208 test_client.start(timeout=timeout)
209 except test_client.handler.timeout_exception:
210 sys.stderr.write("Zookeeper is needed for running this example!\n")
211 traceback.print_exc()
212 return False
213 else:
214 test_client.stop()
215 return True
216
217
218def main():
219 if not check_for_zookeeper():
220 return
221 if len(sys.argv) == 1:
222 main_local()
223 elif sys.argv[1] in ('p', 'c'):
224 if sys.argv[-1] == "v":
225 logging.basicConfig(level=taskflow_logging.TRACE)
226 else:
227 logging.basicConfig(level=logging.ERROR)
228 if sys.argv[1] == 'p':
229 run_poster()
230 else:
231 run_conductor()
232 else:
233 sys.stderr.write("%s p|c (v?)\n" % os.path.basename(sys.argv[0]))
234
235
236if __name__ == '__main__':
237 main()