[Pydra] Updates on task packaging and task sync

Peter Krenesky peter at osuosl.org
Sat Aug 15 16:16:36 UTC 2009

Yin QIU wrote:
> Hi,
> This is an (long) update on the task packaging and task
> synchronization features. I just checked in my changes into the
> task_packaging branch of my git tree. I haven't run tests against the
> new code, and it is probably not working yet. But I believe it will be
> quickly.
> Generally speaking, I implemented:
> 1) a simple task packaging scheme as per description of ticket #74.
>   a) Note that now referencing a task needs a fully-qualified name,
> like 'some_pkg.SomeTask'.
>   b) There can be only ONE tasks_dir, which is "task_cache" by
> default. Otherwise, it will confuse the task synchronization process.
Is it redundant packages (ie same package in two locations) that is the

This is fine for now but eventually we'll want to support multiple
> 2) a naive task synchronization mechanism to fix ticket #22
>   When the scheduler wants a worker to run a task or a workunit, it
> also tells the worker the latest version number of that task. If the
> worker finds that the version number does not match its local version,
> it will initiate a task synchronization request to the master. A
> synchronization session will subsequently take place between the
> master and the worker. After the session, the task definition at the
> worker side will be up-to-date.
> What I haven't finished:
> 1) Signals TASK_UPDATED, TASK_REMOVED, etc. emitted by the master's
> TaskManager are not handled yet. Looks like it will need much effort
> to cleanly and neatly deal with these situations on the worker side.
> However, missing this feature won't be a big problem, since the worker
> will passively synchronize with the master.
Are you referring to unloading and reloading code from python?  It can't
really be done currently.  Python just doesn't have the capability. 
This is one of the reasons to switch to a model where workers are
created specifically for a TaskInstance, and then thrown away after. 

The task manager will also work slightly to ensure the tasks it is
loading are sandboxed away from the Node.  It will have to run a
TaskLoader script in another sandboxxed process that returns a json
string of task information.  TaskManager in this mode will only have a
dictionary of Tasks, it won't have any instances of them.  Removing or
updating a task will just be manipulating the dictionary.
> 2) Should hold the execution of a task on the worker if the task code
> is out-dated and resume it after synchronization. I'm working on this.
> Code changes are mostly in tasks/packaging.py and
> tasks/task_manager.py. Here are some of my notes during the
> development:
> == Task Packaging ==
> I don't change the behavior that a worker runs a task. However, to
> facilitate sandboxed execution, I offered an interface in TaskManager
> (i.e., TaskManager.get_task()) to return all the additional module
> search paths of a task.
> == Task Synchronization ==
> This feature is much more trickier than task packaging. I made several
> design decisions.
> 1) TaskManager is a module used by both the master and the worker.
> Though pydra is operated in a master/slave fashion, I want the
> TaskManager to rely on neither the master or the worker. So
> TaskManager should not call any remote methods of the master and the
> worker, and should be a relatively independent module, except that it
> emits TASK_UPDATE, TASK_REMOVED, etc. signals to other modules. This
> design will even make it easy to develop peer-to-peer task
> synchronization, though we don't need this feature right now.
Great, this is exactly how the module system should be used
> 2) Though TaskManager is independent of other modules, the master and
> the worker needs access to it to manage available tasks, such as
> retrieving tasks and synchronizing tasks. The original architecture
> only allows me to use signals. But this would be a little bit clumsy.
> So I took the concept of "friend class" in C++, and added a _friend
> attribute in the Module class. A module can define its friends as
> follows:
> self._friends = {
>   'friend1' : FriendModule1,
>   'friend2' : FriendModule2,
> }
> After the initialization process, the module will have two more
> attributes, namely friend1 and friend2, as long as FriendModule1 and
> FriendModule2 are found in the same ModuleManager as this module. Note
> that this is actually contrary to the concept of friend class in C++.
> With this in hand, I made TaskManager friends of WorkerTaskControls,
> TaskScheduler, and TaskSyncModule (a new module created by me) so that
> the latter three can access the TaskManager directly.
I'd like to avoid tightly coupling modules but you're right that it does
become clumsy to deal with signals even if they are executed
synchronously.  This is ok, provided we keep tight control over which
modules are allowed to do this.

> 3) Synchronization between A and B may take more than one round of
> message passing, depending on the synchronization algorithm used. To
> flexibly support various synchronization algorithms, I defined two
> interfaces in the TaskPackage class:
> # TaskPackage.active_sync(self, response, phase=1)
> Generates a sync request, and perform certain actions to update the
> local task package to make it the same as a remote package. The
> "response" parameter is the sync response generated by that remote
> task package (see below). "phase" indicates the phase which the sync
> process is in. Obviously, when phase is 1, response should be passed
> as None.
> The return value of this method is a tuple containing the request data
> and a boolean flag, which indicates whether this task package expects
> a response from the remote side.
> # TaskPackage.passive_sync(self, request, phase=1)
> Generates a sync response according to the received sync request. The
> "request" and "phase" parameters have similar meanings to those in
> active_sync().
> Subclasses can derive from TaskPackage and implement their own
> active_sync() and passive_sync() methods to support other
> synchronization mechanisms.
> Take the simplistic tar.gz synchronization algorithm for example. The
> flow of a successful synchronization is:
> 1. WorkerTaskControls detects a task is out-dated, so it calls
> TaskManager.active_sync() with task_key specified.
> 2. TaskManager finds the task package which the task belongs to
> according to the task_key.
> 3. TaskManager calls TaskPackage.active_sync(response=None, phase=1).
> 4. In phase 1 of active_sync, TaskPackage simply returns its current
> version, and indicates that it expects a further response from the
> master.
> 5. WorkerTaskControls sends the sync request generated by
> active_sync() to the master
> 6. TaskSyncModule at the master side receives the sync request, and
> then invokes TaskPackage.passive_sync(request, 1).
> 7. In TaskPackage.passive_sync(), the latest content of the task
> package is compressed in tar.gz format, serialized, and returned as a
> string.
> 8. WorkerTaskControls receives the response from the master and
> subsequently calls TaskPackage.active_sync(response, 2),which will
> decompress the data and result in an update-to-date task package.
I have some issues with this workflow.

1) why use multiple call with a state variable "phase"?  Why not  use
specific function calls?  Are there large sections of reused code that
can't be methods? 

2) It should actually be synchronizing with the Node since Node is
responsible for anything that affects all of its workers.  Also once we
have refactored the Node-Worker relationship (see above) worker will not
always be around for synchronization.

Since this doesn't exist yet you can just mock up a run_task() method on
the Node that just tries to lookup the requested task from the TaskManager.

3) To play nice with the sandbox TaskPackage can't be responsible for
sending sync messages or packaging itself.  There should be a
TaskSyncServer (master) and TaskSyncClient (node) that encapsulates the
entire workflow on both ends.

this is more what i envisioned:

1) When a task is requested from TaskManager this should be an
asynchronous call
   1a) checks for if sync is in progress for that task because there
might be multiple workunits fired at the same time.
   1b) if no sync in progress, send a TASK_NEEDS_UPDATE signal
   1c) deferred created and registered for the requested task key.
2) TaskSyncClient, or other task management modules (ie. directory
scanner) will listen for the signal and attempt to find the task. 
3) If found and or transfered
     3a) the module calls ADD_TASK which unpacks it and loads it as an
available task.
4) The task manager fires any callbacks registered for that task (there
might be multiple workunits)
5) callbacks would cause workers to be spawned and run the task.

TaskManager should handle packaging/unpacking functions to separate that
from synchronization.  This will allow task packages to be added to the
filesystem and unpacked by a different module.

>Now sync requests and responses are all represented in strings.
> Ideally, we will use Producers and Consumers to achieve better
> performance, as we discussed.
> Hope I expressed myself clearly. Any comments are appreciated.


More information about the Pydra mailing list