[Pydra] Updates on task packaging and task sync

Yin QIU allenchue at gmail.com
Sat Aug 15 14:08:16 UTC 2009


This is an (long) update on the task packaging and task
synchronization features. I just checked in my changes into the
task_packaging branch of my git tree. I haven't run tests against the
new code, and it is probably not working yet. But I believe it will be

Generally speaking, I implemented:

1) a simple task packaging scheme as per description of ticket #74.
  a) Note that now referencing a task needs a fully-qualified name,
like 'some_pkg.SomeTask'.
  b) There can be only ONE tasks_dir, which is "task_cache" by
default. Otherwise, it will confuse the task synchronization process.

2) a naive task synchronization mechanism to fix ticket #22
  When the scheduler wants a worker to run a task or a workunit, it
also tells the worker the latest version number of that task. If the
worker finds that the version number does not match its local version,
it will initiate a task synchronization request to the master. A
synchronization session will subsequently take place between the
master and the worker. After the session, the task definition at the
worker side will be up-to-date.

What I haven't finished:

1) Signals TASK_UPDATED, TASK_REMOVED, etc. emitted by the master's
TaskManager are not handled yet. Looks like it will need much effort
to cleanly and neatly deal with these situations on the worker side.
However, missing this feature won't be a big problem, since the worker
will passively synchronize with the master.

2) Should hold the execution of a task on the worker if the task code
is out-dated and resume it after synchronization. I'm working on this.

Code changes are mostly in tasks/packaging.py and
tasks/task_manager.py. Here are some of my notes during the

== Task Packaging ==
I don't change the behavior that a worker runs a task. However, to
facilitate sandboxed execution, I offered an interface in TaskManager
(i.e., TaskManager.get_task()) to return all the additional module
search paths of a task.

== Task Synchronization ==
This feature is much more trickier than task packaging. I made several
design decisions.

1) TaskManager is a module used by both the master and the worker.
Though pydra is operated in a master/slave fashion, I want the
TaskManager to rely on neither the master or the worker. So
TaskManager should not call any remote methods of the master and the
worker, and should be a relatively independent module, except that it
emits TASK_UPDATE, TASK_REMOVED, etc. signals to other modules. This
design will even make it easy to develop peer-to-peer task
synchronization, though we don't need this feature right now.

2) Though TaskManager is independent of other modules, the master and
the worker needs access to it to manage available tasks, such as
retrieving tasks and synchronizing tasks. The original architecture
only allows me to use signals. But this would be a little bit clumsy.
So I took the concept of "friend class" in C++, and added a _friend
attribute in the Module class. A module can define its friends as

self._friends = {
  'friend1' : FriendModule1,
  'friend2' : FriendModule2,

After the initialization process, the module will have two more
attributes, namely friend1 and friend2, as long as FriendModule1 and
FriendModule2 are found in the same ModuleManager as this module. Note
that this is actually contrary to the concept of friend class in C++.

With this in hand, I made TaskManager friends of WorkerTaskControls,
TaskScheduler, and TaskSyncModule (a new module created by me) so that
the latter three can access the TaskManager directly.

3) Synchronization between A and B may take more than one round of
message passing, depending on the synchronization algorithm used. To
flexibly support various synchronization algorithms, I defined two
interfaces in the TaskPackage class:

# TaskPackage.active_sync(self, response, phase=1)
Generates a sync request, and perform certain actions to update the
local task package to make it the same as a remote package. The
"response" parameter is the sync response generated by that remote
task package (see below). "phase" indicates the phase which the sync
process is in. Obviously, when phase is 1, response should be passed
as None.

The return value of this method is a tuple containing the request data
and a boolean flag, which indicates whether this task package expects
a response from the remote side.

# TaskPackage.passive_sync(self, request, phase=1)
Generates a sync response according to the received sync request. The
"request" and "phase" parameters have similar meanings to those in

Subclasses can derive from TaskPackage and implement their own
active_sync() and passive_sync() methods to support other
synchronization mechanisms.

Take the simplistic tar.gz synchronization algorithm for example. The
flow of a successful synchronization is:

1. WorkerTaskControls detects a task is out-dated, so it calls
TaskManager.active_sync() with task_key specified.
2. TaskManager finds the task package which the task belongs to
according to the task_key.
3. TaskManager calls TaskPackage.active_sync(response=None, phase=1).
4. In phase 1 of active_sync, TaskPackage simply returns its current
version, and indicates that it expects a further response from the
5. WorkerTaskControls sends the sync request generated by
active_sync() to the master
6. TaskSyncModule at the master side receives the sync request, and
then invokes TaskPackage.passive_sync(request, 1).
7. In TaskPackage.passive_sync(), the latest content of the task
package is compressed in tar.gz format, serialized, and returned as a
8. WorkerTaskControls receives the response from the master and
subsequently calls TaskPackage.active_sync(response, 2),which will
decompress the data and result in an update-to-date task package.

Now sync requests and responses are all represented in strings.
Ideally, we will use Producers and Consumers to achieve better
performance, as we discussed.

Hope I expressed myself clearly. Any comments are appreciated.

Yin Qiu
Nanjing University, China

More information about the Pydra mailing list