Flow Manager

This module defines the FlowManager class.

class admit.FlowManager.FlowManager(connmap={}, bdpmap={}, depsmap={}, tasklevs={}, tasks={})

Manages the flow of data products between ADMIT tasks.

The flow manager maintains the tree of ADMIT tasks (ATs) constituting a data flow. Tasks communicate through basic data products (BDPs) that flow between tasks, which combine and transform them into new BDPs according to their function. This class implements the bookkeeping to track task connections and inter-dependencies. A central concept is the task connection 4-tuple, (si, sp, di, dp), connecting a source task (ID si) output BDP (port sp) to a destination task (ID di) input BDP (port dp). The primary concern of the flow manager is to organize and maintain this connection information to permit efficient execution and modification of data flows.

Parameters:

connmap : triple-nested dictionary of 4-tuples, optional

Initial value of the _connmap attribute (defaults to an empty dictionary).

bdpmap : dictionary of list of 2-tuples, optional

Initial value of the _bdpmap attribute (defaults to an empty dictionary).

depsmap : dictionary of sets, optional

Initial value of the _depsmap attribute (defaults to an empty dictionary).

tasklevs : dictionary of int, optional

Initial value of the _tasklevs attribute (defaults to an empty dictionary).

tasks : dictionary of task references, optional

Initial value of the _tasks attribute (defaults to an empty dictionary).

Notes

All internal state is kept up to date by add() and remove() so that it is valid at all times. Hence _connmap, _depsmap, etc. should never be modified by any other methods, either inside or outside of FlowManager (the leading underscore emphasizes this intent).

Attributes

_connmap (triple-nested dictionary of 4-tuples) Organizes connection 4-tuples in a 3-level dictionary hierarchy; _connmap[si][di][dp] is a connection 4-tuple (si, sp, di, dp) connecting source task ID si, BDP output port sp, to destination task di, input dp.
_bdpmap (dictionary of list of 2-tuples) Relates BDP outputs from source tasks with the BDP inputs to connected destination tasks; _bdpmap[di] is a list of 2-tuples (si, sp), one for each BDP input port, describing the source of the corresponding BDP.
_depsmap (dictionary of sets) Maintains the task dependency levels needed by the run() method; _depsmap[level] is the set of task IDs residing at the dependency level. Lower level tasks must execute prior to higher level tasks to satisfy dependency requirements; tasks at the same level may be executed in any order (even concurrently).
_tasklevs (dictionary of int) Maintains task dependency information in a complementary (inverse) manner to _depsmap; _tasklevels[id] is the dependency level for task id.
_tasks (dictionary of task references) Holds references to all ADMIT tasks in the flow, keyed by task id; _tasks[id] is an ADMIT task reference.

Methods

add(a[, stuples, dtuples]) Appends or inserts an AT into the task flow.
clone(id) Clones the flow emanating from a given root task (included).
downstream(id[, leaf]) Determines the ATs downstream of a task (includes itself).
dryrun() Performs a dry run.
has_task(id) Checks whether a task exists in the flow.
remove(id) Removes an AT and its downstream tasks.
replace(id, a[, stuples]) Replaces one task with another, removing the original.
run([dryrun]) Executes the flow, but only tasks that are out of date.
show() Displays formatted internal FlowManager state.
stale(id[, direct]) Sets the stale flag of an AT and its downstream ATs.
verify() Verifies the internal state of the FlowManager.
add(a, stuples=[], dtuples=[])

Appends or inserts an AT into the task flow.

Appending a task creates a new leaf—a task whose BDP outputs (if any) are not connected to any tasks; in this case, dtuples can be omitted. Insertion implies that one or more of the tasks’s outputs feeds back into the flow, in which case dtuples will be non-empty.

Parameters:

a : AT

ADMIT task to append/insert into the flow.

stuples : list of 2-tuples, optional

List of source connection 2-tuples, one per BDP input port. For example, [(si0,sp0), (si1,sp1)] if the task requires two BDP inputs, the first (input port 0) from task si0, BDP output sp0 and the second (input port 1) from task si1, BDP output sp1. Defaults to an empty list (no upstream tasks).

dtuples : list of 4-tuples, optional

List of destination connection 4-tuples (si, sp, di, dp). The number of connections varies but will be zero for leaf (i.e., appended) tasks. The source task ID in these tuples must match the input task ID a.id, as it is the source for all dtuples connections. For example, [(1, 0, 2, 1), (1, 0, 3, 2)] if this task (id #1) disseminates its BDP output (port 0) to two downstream task inputs (#2 port 1 and #3 port 2). Defaults to an empty list (no downstream tasks).

Returns:

int

Input task ID on success, else -1 (error detected).

See also

remove, replace

Notes

Usually all but the root task(s) will have a non-empty stuples list.

clone(id)

Clones the flow emanating from a given root task (included).

Creates an independent, parallel sub-flow duplicating the action of the original.

Parameters:

id : int

Task ID of the existing sub-flow root task.

Returns:

int

Task ID of the new (clone) root task.

downstream(id, leaf=set([]))

Determines the ATs downstream of a task (includes itself).

The downstream tasks constitute the sub-flow emanating from the specified root task (also considered part of the sub-flow).

Parameters:

id : int

Root task ID.

leaf : set of int, optional

Initial set of leaf task IDs.

Returns:

list

List of AT task IDs (including the root, id) on success, else an empty list if recursion is detected.

Notes

Recursion through task id is detected. This is used by add() to prevent the creation of cyclic flows.

dryrun()

Performs a dry run.

Dry runs provide a summary of which tasks are in the flow, which are stale (out-of-date), and the order in which they will be executed in a live run.

Returns:None
has_task(id)

Checks whether a task exists in the flow.

This method does not differentiate between current, stale or disabled tasks, merely whether a task with the specified ID is present in the flow.

Parameters:

id : int

Task ID.

Returns:

bool

True if the task is in the flow, else False.

remove(id)

Removes an AT and its downstream tasks.

Deletes an entire sub-flow starting from the specified root task.

Parameters:

id : int

Task ID of root AT to be removed.

Returns:

None

See also

add, replace

replace(id, a, stuples=None)

Replaces one task with another, removing the original.

The replacement task must have the same output signature (i.e, produce the same types/number of output BDPs)—otherwise the existing task could not be removed—but stuples may be specified if the inputs differ.

Parameters:

id : int

Task ID of AT to be removed.

a : AT

Task to insert into the flow.

stuples : list of 2-tuples, optional

Source connection 2-tuples (si, sp) for a. The special default value None indicates that the existing task’s sources should be reused verbatim.

Returns:

None

See also

add, remove

run(dryrun=False)

Executes the flow, but only tasks that are out of date.

Runs all enabled tasks in the correct order, accounting for their inter-dependencies. This will reduce to a no-op for tasks whose outputs are up-to-date (not dependent on stale tasks).

Parameters:

dryrun : bool, optional

Whether to perform a dry run (else a live run); defaults to False.

Returns:

None

show()

Displays formatted internal FlowManager state.

Pretty-prints the current contents of the instance to the screen (standard output).

Returns:None
stale(id, direct=True)

Sets the stale flag of an AT and its downstream ATs.

Stale tasks will be re-run upon execution of the flow, updating their output BDPs in the process; tasks not so marked are skipped to minimize running time.

Parameters:

id : int

Root task ID.

direct : bool, optional

Whether to mark only direct descendants stale (else the entire sub-flow); defaults to True.

Returns:

None

verify()

Verifies the internal state of the FlowManager.

Verification searches for internal inconsistencies in the flow bookkeeping, which to improve runtime efficiency is partially redundant.

Returns:

bool

True if the FlowManager state is valid, else False.

This Page