[Pydra] Updates on task packaging and task sync

Yin QIU allenchue at gmail.com
Sat Aug 15 18:07:37 UTC 2009


On Sun, Aug 16, 2009 at 12:16 AM, Peter Krenesky<peter at osuosl.org> wrote:
> Yin QIU wrote:
>> Hi,
>>
>> This is an (long) update on the task packaging and task
>> synchronization features. I just checked in my changes into the
>> task_packaging branch of my git tree. I haven't run tests against the
>> new code, and it is probably not working yet. But I believe it will be
>> quickly.
>>
>> Generally speaking, I implemented:
>>
>> 1) a simple task packaging scheme as per description of ticket #74.
>>   a) Note that now referencing a task needs a fully-qualified name,
>> like 'some_pkg.SomeTask'.
>>   b) There can be only ONE tasks_dir, which is "task_cache" by
>> default. Otherwise, it will confuse the task synchronization process.
>>
>>
> Is it redundant packages (ie same package in two locations) that is the
> problem?
>
> This is fine for now but eventually we'll want to support multiple
> directories.

Partially. The main problem is: how do we know where to put a specific
task package if multiple tasks_dir are allowed?

Suppose the master has 2 tasks_dir, /folder/one/ and /folder/two/.
There is a task package called "demo" in /folder/one. Does that mean
there should be two folders on each worker, with their names exactly
being /folder/one/ and /folder/two/? If that is the case, I think this
is way inflexible. And if it is not, we have to map /folder/one/ to a
local directory on each worker. Please correct me if I understand you
right.


>> 2) a naive task synchronization mechanism to fix ticket #22
>>   When the scheduler wants a worker to run a task or a workunit, it
>> also tells the worker the latest version number of that task. If the
>> worker finds that the version number does not match its local version,
>> it will initiate a task synchronization request to the master. A
>> synchronization session will subsequently take place between the
>> master and the worker. After the session, the task definition at the
>> worker side will be up-to-date.
>>
>> What I haven't finished:
>>
>> 1) Signals TASK_UPDATED, TASK_REMOVED, etc. emitted by the master's
>> TaskManager are not handled yet. Looks like it will need much effort
>> to cleanly and neatly deal with these situations on the worker side.
>> However, missing this feature won't be a big problem, since the worker
>> will passively synchronize with the master.
>>
>>
> Are you referring to unloading and reloading code from python?  It can't
> really be done currently.  Python just doesn't have the capability.
> This is one of the reasons to switch to a model where workers are
> created specifically for a TaskInstance, and then thrown away after.
>

There are two things missing. One is to inform the worker the updates
so that synchronization can be done in an active way rather than
passive. However, I don't know if it is desirable yet.

The other is to stop task execution and to re-run them. As you said,
we would change the way that workers are created and disposed. I guess
this feature can be postponed.

> The task manager will also work slightly to ensure the tasks it is
> loading are sandboxed away from the Node.  It will have to run a
> TaskLoader script in another sandboxxed process that returns a json
> string of task information.  TaskManager in this mode will only have a
> dictionary of Tasks, it won't have any instances of them.  Removing or
> updating a task will just be manipulating the dictionary.
>> 2) Should hold the execution of a task on the worker if the task code
>> is out-dated and resume it after synchronization. I'm working on this.
>>
>>
>> Code changes are mostly in tasks/packaging.py and
>> tasks/task_manager.py. Here are some of my notes during the
>> development:
>>
>> == Task Packaging ==
>> I don't change the behavior that a worker runs a task. However, to
>> facilitate sandboxed execution, I offered an interface in TaskManager
>> (i.e., TaskManager.get_task()) to return all the additional module
>> search paths of a task.
>>
>>
>> == Task Synchronization ==
>> This feature is much more trickier than task packaging. I made several
>> design decisions.
>>
>> 1) TaskManager is a module used by both the master and the worker.
>> Though pydra is operated in a master/slave fashion, I want the
>> TaskManager to rely on neither the master or the worker. So
>> TaskManager should not call any remote methods of the master and the
>> worker, and should be a relatively independent module, except that it
>> emits TASK_UPDATE, TASK_REMOVED, etc. signals to other modules. This
>> design will even make it easy to develop peer-to-peer task
>> synchronization, though we don't need this feature right now.
>>
>>
> Great, this is exactly how the module system should be used
>> 2) Though TaskManager is independent of other modules, the master and
>> the worker needs access to it to manage available tasks, such as
>> retrieving tasks and synchronizing tasks. The original architecture
>> only allows me to use signals. But this would be a little bit clumsy.
>> So I took the concept of "friend class" in C++, and added a _friend
>> attribute in the Module class. A module can define its friends as
>> follows:
>>
>> self._friends = {
>>   'friend1' : FriendModule1,
>>   'friend2' : FriendModule2,
>> }
>>
>> After the initialization process, the module will have two more
>> attributes, namely friend1 and friend2, as long as FriendModule1 and
>> FriendModule2 are found in the same ModuleManager as this module. Note
>> that this is actually contrary to the concept of friend class in C++.
>>
>> With this in hand, I made TaskManager friends of WorkerTaskControls,
>> TaskScheduler, and TaskSyncModule (a new module created by me) so that
>> the latter three can access the TaskManager directly.
>>
>>
> I'd like to avoid tightly coupling modules but you're right that it does
> become clumsy to deal with signals even if they are executed
> synchronously.  This is ok, provided we keep tight control over which
> modules are allowed to do this.
>

Yeah, tight coupling was my concern about this change too. But I
didn't find a better solution. The main reason is, I guess,
TaskManager naturally has tight relationship with the master and
workers, but a single shared "registry" attribute is not enough to
reflect this relationship.

>> 3) Synchronization between A and B may take more than one round of
>> message passing, depending on the synchronization algorithm used. To
>> flexibly support various synchronization algorithms, I defined two
>> interfaces in the TaskPackage class:
>>
>> # TaskPackage.active_sync(self, response, phase=1)
>> Generates a sync request, and perform certain actions to update the
>> local task package to make it the same as a remote package. The
>> "response" parameter is the sync response generated by that remote
>> task package (see below). "phase" indicates the phase which the sync
>> process is in. Obviously, when phase is 1, response should be passed
>> as None.
>>
>> The return value of this method is a tuple containing the request data
>> and a boolean flag, which indicates whether this task package expects
>> a response from the remote side.
>>
>> # TaskPackage.passive_sync(self, request, phase=1)
>> Generates a sync response according to the received sync request. The
>> "request" and "phase" parameters have similar meanings to those in
>> active_sync().
>>
>> Subclasses can derive from TaskPackage and implement their own
>> active_sync() and passive_sync() methods to support other
>> synchronization mechanisms.
>>
>> Take the simplistic tar.gz synchronization algorithm for example. The
>> flow of a successful synchronization is:
>>
>> 1. WorkerTaskControls detects a task is out-dated, so it calls
>> TaskManager.active_sync() with task_key specified.
>> 2. TaskManager finds the task package which the task belongs to
>> according to the task_key.
>> 3. TaskManager calls TaskPackage.active_sync(response=None, phase=1).
>> 4. In phase 1 of active_sync, TaskPackage simply returns its current
>> version, and indicates that it expects a further response from the
>> master.
>> 5. WorkerTaskControls sends the sync request generated by
>> active_sync() to the master
>> 6. TaskSyncModule at the master side receives the sync request, and
>> then invokes TaskPackage.passive_sync(request, 1).
>> 7. In TaskPackage.passive_sync(), the latest content of the task
>> package is compressed in tar.gz format, serialized, and returned as a
>> string.
>> 8. WorkerTaskControls receives the response from the master and
>> subsequently calls TaskPackage.active_sync(response, 2),which will
>> decompress the data and result in an update-to-date task package.
>>
>>
>>
> I have some issues with this workflow.
>
> 1) why use multiple call with a state variable "phase"?  Why not  use
> specific function calls?  Are there large sections of reused code that
> can't be methods?

In my simplistic "tar.gz" scheme, there are 2 phases at the sync
requester side, and 1 phase at the sync server side. That is:

phase 1 at the worker side: worker ---sends local version--> master
phase 1 at the master side: master (phase 1)---sends compressed data--> worker
phase 2 at the worker side: worker decompresses the data

but other synchronization schemes, which may be based on delta
encoding, could require more phases to do a synchronization. I cannot
offer a set of versatile interfaces for that.

>
> 2) It should actually be synchronizing with the Node since Node is
> responsible for anything that affects all of its workers.  Also once we
> have refactored the Node-Worker relationship (see above) worker will not
> always be around for synchronization.
>

Oh, I had doubts about this when I first started working on task sync.
Sorry that I later forgot it.

> Since this doesn't exist yet you can just mock up a run_task() method on
> the Node that just tries to lookup the requested task from the TaskManager.
>

Simply mocking up the run_task() method is not enough. Now the
scheduler directly calls the remote interfaces of workers. Seems like
we would change the way that the scheduler works and maintain node
references on the master, which would result in lots of changes. Is
that acceptable for now?

> 3) To play nice with the sandbox TaskPackage can't be responsible for
> sending sync messages or packaging itself.  There should be a
> TaskSyncServer (master) and TaskSyncClient (node) that encapsulates the
> entire workflow on both ends.

I totally agree. Actually it is almost my current design. A
TaskSyncModule at the master side handles sync requests. I put the
sync requesting functionality in WorkerTaskControls without defining a
new module.

>
> this is more what i envisioned:
>
> 1) When a task is requested from TaskManager this should be an
> asynchronous call
>   1a) checks for if sync is in progress for that task because there
> might be multiple workunits fired at the same time.

Good point! I did not consider this :(

>   1b) if no sync in progress, send a TASK_NEEDS_UPDATE signal
>   1c) deferred created and registered for the requested task key.
> 2) TaskSyncClient, or other task management modules (ie. directory
> scanner) will listen for the signal and attempt to find the task.
> 3) If found and or transfered
>     3a) the module calls ADD_TASK which unpacks it and loads it as an
> available task.
> 4) The task manager fires any callbacks registered for that task (there
> might be multiple workunits)
> 5) callbacks would cause workers to be spawned and run the task.

In fact I'm planning to adopt this asynchronous design. There are two
reasons that drives me to do this: 1) task synchronization may be
time-consuming, especially in case of a "cold start"; 2) twisted's
async nature :-)

There is however an embarrassing question: since the pencil-down date
is approaching, I guess I won't be able to finish this before that
date. Can I continue working on it after gsoc?

>
> TaskManager should handle packaging/unpacking functions to separate that
> from synchronization.  This will allow task packages to be added to the
> filesystem and unpacked by a different module.
>

Could you explain more on this? I'm sorry I didn't understand it quite well.

>>Now sync requests and responses are all represented in strings.
>> Ideally, we will use Producers and Consumers to achieve better
>> performance, as we discussed.
>>
>>
>> Hope I expressed myself clearly. Any comments are appreciated.
>>
>>
>>
>
> -Peter
> _______________________________________________
> Pydra mailing list
> Pydra at osuosl.org
> http://lists.osuosl.org/mailman/listinfo/pydra
>



-- 
Yin Qiu
Nanjing University, China


More information about the Pydra mailing list