[Pydra] Updates on task packaging and task sync

Peter Krenesky peter at osuosl.org
Sat Aug 15 18:51:13 UTC 2009

Yin QIU wrote:
> On Sun, Aug 16, 2009 at 12:16 AM, Peter Krenesky<peter at osuosl.org> wrote:
>> Yin QIU wrote:
>>> Hi,
>>> This is an (long) update on the task packaging and task
>>> synchronization features. I just checked in my changes into the
>>> task_packaging branch of my git tree. I haven't run tests against the
>>> new code, and it is probably not working yet. But I believe it will be
>>> quickly.
>>> Generally speaking, I implemented:
>>> 1) a simple task packaging scheme as per description of ticket #74.
>>>   a) Note that now referencing a task needs a fully-qualified name,
>>> like 'some_pkg.SomeTask'.
>>>   b) There can be only ONE tasks_dir, which is "task_cache" by
>>> default. Otherwise, it will confuse the task synchronization process.
>> Is it redundant packages (ie same package in two locations) that is the
>> problem?
>> This is fine for now but eventually we'll want to support multiple
>> directories.
> Partially. The main problem is: how do we know where to put a specific
> task package if multiple tasks_dir are allowed?
> Suppose the master has 2 tasks_dir, /folder/one/ and /folder/two/.
> There is a task package called "demo" in /folder/one. Does that mean
> there should be two folders on each worker, with their names exactly
> being /folder/one/ and /folder/two/? If that is the case, I think this
> is way inflexible. And if it is not, we have to map /folder/one/ to a
> local directory on each worker. Please correct me if I understand you
> right.
oh you're right.  That is a big problem.  One directory is fine for
now.  We'll have to revisit this later for a better solution
>>> 2) a naive task synchronization mechanism to fix ticket #22
>>>   When the scheduler wants a worker to run a task or a workunit, it
>>> also tells the worker the latest version number of that task. If the
>>> worker finds that the version number does not match its local version,
>>> it will initiate a task synchronization request to the master. A
>>> synchronization session will subsequently take place between the
>>> master and the worker. After the session, the task definition at the
>>> worker side will be up-to-date.
>>> What I haven't finished:
>>> 1) Signals TASK_UPDATED, TASK_REMOVED, etc. emitted by the master's
>>> TaskManager are not handled yet. Looks like it will need much effort
>>> to cleanly and neatly deal with these situations on the worker side.
>>> However, missing this feature won't be a big problem, since the worker
>>> will passively synchronize with the master.
>> Are you referring to unloading and reloading code from python?  It can't
>> really be done currently.  Python just doesn't have the capability.
>> This is one of the reasons to switch to a model where workers are
>> created specifically for a TaskInstance, and then thrown away after.
> There are two things missing. One is to inform the worker the updates
> so that synchronization can be done in an active way rather than
> passive. However, I don't know if it is desirable yet.
> The other is to stop task execution and to re-run them. As you said,
> we would change the way that workers are created and disposed. I guess
> this feature can be postponed.
Workers should not be updated if they are already running a task.  There
is no way to ensure that the newer version will be compatible.
This is a big problem.  We could have two instances of the same task
scheduled and the task updated before the first task completes.

I think we need to include versioning in the package location and
instance tracking so that tasks that are currently running can finish
using the same set of code they started with.  This would mean some
extra calls to ensure that unused versions are removed after they are no
longer needed.

I think this could be most problematic with dependencies.  We'd have to
know which version of a dependency a task started with.

>> The task manager will also work slightly to ensure the tasks it is
>> loading are sandboxed away from the Node.  It will have to run a
>> TaskLoader script in another sandboxxed process that returns a json
>> string of task information.  TaskManager in this mode will only have a
>> dictionary of Tasks, it won't have any instances of them.  Removing or
>> updating a task will just be manipulating the dictionary.
>>> 2) Should hold the execution of a task on the worker if the task code
>>> is out-dated and resume it after synchronization. I'm working on this.
>>> Code changes are mostly in tasks/packaging.py and
>>> tasks/task_manager.py. Here are some of my notes during the
>>> development:
>>> == Task Packaging ==
>>> I don't change the behavior that a worker runs a task. However, to
>>> facilitate sandboxed execution, I offered an interface in TaskManager
>>> (i.e., TaskManager.get_task()) to return all the additional module
>>> search paths of a task.
>>> == Task Synchronization ==
>>> This feature is much more trickier than task packaging. I made several
>>> design decisions.
>>> 1) TaskManager is a module used by both the master and the worker.
>>> Though pydra is operated in a master/slave fashion, I want the
>>> TaskManager to rely on neither the master or the worker. So
>>> TaskManager should not call any remote methods of the master and the
>>> worker, and should be a relatively independent module, except that it
>>> emits TASK_UPDATE, TASK_REMOVED, etc. signals to other modules. This
>>> design will even make it easy to develop peer-to-peer task
>>> synchronization, though we don't need this feature right now.
>> Great, this is exactly how the module system should be used
>>> 2) Though TaskManager is independent of other modules, the master and
>>> the worker needs access to it to manage available tasks, such as
>>> retrieving tasks and synchronizing tasks. The original architecture
>>> only allows me to use signals. But this would be a little bit clumsy.
>>> So I took the concept of "friend class" in C++, and added a _friend
>>> attribute in the Module class. A module can define its friends as
>>> follows:
>>> self._friends = {
>>>   'friend1' : FriendModule1,
>>>   'friend2' : FriendModule2,
>>> }
>>> After the initialization process, the module will have two more
>>> attributes, namely friend1 and friend2, as long as FriendModule1 and
>>> FriendModule2 are found in the same ModuleManager as this module. Note
>>> that this is actually contrary to the concept of friend class in C++.
>>> With this in hand, I made TaskManager friends of WorkerTaskControls,
>>> TaskScheduler, and TaskSyncModule (a new module created by me) so that
>>> the latter three can access the TaskManager directly.
>> I'd like to avoid tightly coupling modules but you're right that it does
>> become clumsy to deal with signals even if they are executed
>> synchronously.  This is ok, provided we keep tight control over which
>> modules are allowed to do this.
> Yeah, tight coupling was my concern about this change too. But I
> didn't find a better solution. The main reason is, I guess,
> TaskManager naturally has tight relationship with the master and
> workers, but a single shared "registry" attribute is not enough to
> reflect this relationship.
>>> 3) Synchronization between A and B may take more than one round of
>>> message passing, depending on the synchronization algorithm used. To
>>> flexibly support various synchronization algorithms, I defined two
>>> interfaces in the TaskPackage class:
>>> # TaskPackage.active_sync(self, response, phase=1)
>>> Generates a sync request, and perform certain actions to update the
>>> local task package to make it the same as a remote package. The
>>> "response" parameter is the sync response generated by that remote
>>> task package (see below). "phase" indicates the phase which the sync
>>> process is in. Obviously, when phase is 1, response should be passed
>>> as None.
>>> The return value of this method is a tuple containing the request data
>>> and a boolean flag, which indicates whether this task package expects
>>> a response from the remote side.
>>> # TaskPackage.passive_sync(self, request, phase=1)
>>> Generates a sync response according to the received sync request. The
>>> "request" and "phase" parameters have similar meanings to those in
>>> active_sync().
>>> Subclasses can derive from TaskPackage and implement their own
>>> active_sync() and passive_sync() methods to support other
>>> synchronization mechanisms.
>>> Take the simplistic tar.gz synchronization algorithm for example. The
>>> flow of a successful synchronization is:
>>> 1. WorkerTaskControls detects a task is out-dated, so it calls
>>> TaskManager.active_sync() with task_key specified.
>>> 2. TaskManager finds the task package which the task belongs to
>>> according to the task_key.
>>> 3. TaskManager calls TaskPackage.active_sync(response=None, phase=1).
>>> 4. In phase 1 of active_sync, TaskPackage simply returns its current
>>> version, and indicates that it expects a further response from the
>>> master.
>>> 5. WorkerTaskControls sends the sync request generated by
>>> active_sync() to the master
>>> 6. TaskSyncModule at the master side receives the sync request, and
>>> then invokes TaskPackage.passive_sync(request, 1).
>>> 7. In TaskPackage.passive_sync(), the latest content of the task
>>> package is compressed in tar.gz format, serialized, and returned as a
>>> string.
>>> 8. WorkerTaskControls receives the response from the master and
>>> subsequently calls TaskPackage.active_sync(response, 2),which will
>>> decompress the data and result in an update-to-date task package.
>> I have some issues with this workflow.
>> 1) why use multiple call with a state variable "phase"?  Why not  use
>> specific function calls?  Are there large sections of reused code that
>> can't be methods?
> In my simplistic "tar.gz" scheme, there are 2 phases at the sync
> requester side, and 1 phase at the sync server side. That is:
> phase 1 at the worker side: worker ---sends local version--> master
> phase 1 at the master side: master (phase 1)---sends compressed data--> worker
> phase 2 at the worker side: worker decompresses the data
> but other synchronization schemes, which may be based on delta
> encoding, could require more phases to do a synchronization. I cannot
> offer a set of versatile interfaces for that.
I was just saying that phase 1 and two seem like distinct calls:
   * node.request_sync() calls  master.request_sync()
   * master.request_sync() calls node.receive_sync() with data

it shouldn't matter whether node.receive_sync() is receiving a whole
tarball, deltas, etc.  But i don't see the code for this in your
repository so i couldnt tell for sure if there was more to it.

>> 2) It should actually be synchronizing with the Node since Node is
>> responsible for anything that affects all of its workers.  Also once we
>> have refactored the Node-Worker relationship (see above) worker will not
>> always be around for synchronization.
> Oh, I had doubts about this when I first started working on task sync.
> Sorry that I later forgot it.
>> Since this doesn't exist yet you can just mock up a run_task() method on
>> the Node that just tries to lookup the requested task from the TaskManager.
> Simply mocking up the run_task() method is not enough. Now the
> scheduler directly calls the remote interfaces of workers. Seems like
> we would change the way that the scheduler works and maintain node
> references on the master, which would result in lots of changes. Is
> that acceptable for now?
Correct.  refactoring the node-worker relationship will mean that
scheduler interacts with the node.  For many things the node will act as
a proxy to the worker.  The node will have all the remotes (run_task,
send_results, etc) that worker currently has, but they will be mostly
proxies to the worker.

perhaps this is a better solution:
    write the module TaskSyncClient and include it as part of the
Worker.  When we move these functions over to the Node we can move the
TaskSyncClient as well.  Node will eventually have a NodeTaskControls
module that has the same set of listeners, signals, and remotes that
WorkerTaskControls has.  It should be fairly transparent and an easy switch.
>> 3) To play nice with the sandbox TaskPackage can't be responsible for
>> sending sync messages or packaging itself.  There should be a
>> TaskSyncServer (master) and TaskSyncClient (node) that encapsulates the
>> entire workflow on both ends.
> I totally agree. Actually it is almost my current design. A
> TaskSyncModule at the master side handles sync requests. I put the
> sync requesting functionality in WorkerTaskControls without defining a
> new module.
>> this is more what i envisioned:
>> 1) When a task is requested from TaskManager this should be an
>> asynchronous call
>>   1a) checks for if sync is in progress for that task because there
>> might be multiple workunits fired at the same time.
> Good point! I did not consider this :(
>>   1b) if no sync in progress, send a TASK_NEEDS_UPDATE signal
>>   1c) deferred created and registered for the requested task key.
>> 2) TaskSyncClient, or other task management modules (ie. directory
>> scanner) will listen for the signal and attempt to find the task.
>> 3) If found and or transfered
>>     3a) the module calls ADD_TASK which unpacks it and loads it as an
>> available task.
>> 4) The task manager fires any callbacks registered for that task (there
>> might be multiple workunits)
>> 5) callbacks would cause workers to be spawned and run the task.
> In fact I'm planning to adopt this asynchronous design. There are two
> reasons that drives me to do this: 1) task synchronization may be
> time-consuming, especially in case of a "cold start"; 2) twisted's
> async nature :-)
> There is however an embarrassing question: since the pencil-down date
> is approaching, I guess I won't be able to finish this before that
> date. Can I continue working on it after gsoc?
Absolutely.  I'd really like for you to remain a part of the community
after GSOC ends.  Even though you haven't completed all the code in your
proposal, you've added a lot of  value to the project.  We've worked
through the logic of some very tough problems that on my own I probably
wouldn't have gotten right. 
>> TaskManager should handle packaging/unpacking functions to separate that
>> from synchronization.  This will allow task packages to be added to the
>> filesystem and unpacked by a different module.
> Could you explain more on this? I'm sorry I didn't understand it quite well.
We'll also want another components to be able to unpack tasks from other

    1) tarballs or source dropped directly into task_cache
    2) tarballs uploaded to the website

That means the code for unpacking the code must be in a general place. 
TaskManager makes the most sense as it handles loading tasks and
maintaining dictionaries of available tasks.
>>> Now sync requests and responses are all represented in strings.
>>> Ideally, we will use Producers and Consumers to achieve better
>>> performance, as we discussed.
>>> Hope I expressed myself clearly. Any comments are appreciated.
>> -Peter
>> _______________________________________________
>> Pydra mailing list
>> Pydra at osuosl.org
>> http://lists.osuosl.org/mailman/listinfo/pydra

More information about the Pydra mailing list