d6tflow package¶
Submodules¶
Module contents¶
-
class
d6tflow.
FlowExport
(tasks=None, flows=None, save=False, path_export='tasks_d6tpipe.py')[source]¶ Bases:
object
Auto generate task files to quickly share workflows with others using d6tpipe.
Parameters: - tasks (obj) – task or list of tasks to share
- flows (obj) – flow or list of flows to get tasks from.
- save (bool) – save to tasks file
- path_export (str) – filename for tasks to export.
-
class
d6tflow.
Workflow
(task=None, params=None, path=None, env=None)[source]¶ Bases:
object
The class is used to orchestrate tasks and define a task pipeline
-
get_task
(task=None)[source]¶ Get task with the workflow parameters
Parameters: task (class) – Retuns: An instance of task class with the workflow parameters
-
outputLoad
(task=None, keys=None, as_dict=False, cached=False)[source]¶ Load output from task with the workflow parameters
Parameters: - task (class) – task class
- keys (list) – list of data to load
- as_dict (bool) – cache data in memory
- cached (bool) – cache data in memory
Returns: list or dict of all task output
-
outputLoadAll
(task=None, keys=None, as_dict=False, cached=False)[source]¶ Load all output from task with the workflow parameters
Parameters: - task (class) – task class
- keys (list) – list of data to load
- as_dict (bool) – cache data in memory
- cached (bool) – cache data in memory
Returns: list or dict of all task output
-
outputPath
(task=None)[source]¶ Ouputs the Path given a task
Parameters: task (class) – task class Returns: list or dict of all task paths
-
preview
(tasks=None, indent='', last=True, show_params=True, clip_params=False)[source]¶ Preview task flows with the workflow parameters
Parameters: tasks (class, list) – task class or list of tasks class
-
reset_downstream
(task, task_downstream=None, confirm=False)[source]¶ Invalidate all downstream tasks in a flow.
For example, you have 3 dependant tasks. Normally you run Task3 but you’ve changed parameters for Task1. By invalidating Task3 it will check the full DAG and realize Task1 needs to be invalidated and therefore Task2 and Task3 also.
Parameters: - task (obj) – task to invalidate. This should be an downstream task for which you want to check downstream dependencies for invalidation conditions
- task_downstream (obj) – downstream task target
- confirm (bool) – confirm operation
-
run
(tasks=None, forced=None, forced_all=False, forced_all_upstream=False, confirm=False, workers=1, abort=True, execution_summary=None, **kwargs)[source]¶ Run tasks with the workflow parameters. See luigi.build for additional details
Parameters: - tasks (class, list) – task class or list of tasks class
- forced (list) – list of forced tasks
- forced_all (bool) – force all tasks
- forced_all_upstream (bool) – force all tasks including upstream
- confirm (list) – confirm invalidating tasks
- workers (int) – number of workers
- abort (bool) – on errors raise exception
- execution_summary (bool) – print execution summary
- kwargs – keywords to pass to luigi.build
-
-
class
d6tflow.
WorkflowMulti
(task=None, params=None, path=None, env=None)[source]¶ Bases:
object
A multi experiment workflow can be defined with multiple flows and separate parameters for each flow and a default task. It is mandatory to define the flows and parameters for each of the flows.
-
get_flow
(flow)[source]¶ Get flow by name
Parameters: flow (string) – The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run Retuns: An instance of Workflow
-
get_task
(task=None, flow=None)[source]¶ Get task with the workflow parameters for a flow
Parameters: - flow (string) – The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
- task (class) – task class
Retuns: An instance of task class with the workflow parameters
-
outputLoad
(task=None, flow=None, keys=None, as_dict=False, cached=False)[source]¶ Load output from task with the workflow parameters for a flow
Parameters: - flow (string) – The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
- task (class) – task class
- keys (list) – list of data to load
- as_dict (bool) – cache data in memory
- cached (bool) – cache data in memory
Returns: list or dict of all task output
-
outputLoadAll
(task=None, flow=None, keys=None, as_dict=False, cached=False)[source]¶ Load all output from task with the workflow parameters for a flow
Parameters: - flow (string) – The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
- task (class) – task class
- keys (list) – list of data to load
- as_dict (bool) – cache data in memory
- cached (bool) – cache data in memory
Returns: list or dict of all task output
-
outputPath
(task=None, flow=None)[source]¶ Ouputs the Path given a task
Parameters: - task (class) – task class
- flow (string) – The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
Returns: list or dict of all task paths
-
preview
(tasks=None, flow=None, indent='', last=True, show_params=True, clip_params=False)[source]¶ Preview task flows with the workflow parameters for a flow
Parameters: - flow (string) – The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
- tasks (class, list) – task class or list of tasks class
-
run
(tasks=None, flow=None, forced=None, forced_all=False, forced_all_upstream=False, confirm=False, workers=1, abort=True, execution_summary=None, **kwargs)[source]¶ Run tasks with the workflow parameters for a flow. See luigi.build for additional details
Parameters: - flow (string) – The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
- tasks (class, list) – task class or list of tasks class
- forced (list) – list of forced tasks
- forced_all (bool) – force all tasks
- forced_all_upstream (bool) – force all tasks including upstream
- confirm (list) – confirm invalidating tasks
- workers (int) – number of workers
- abort (bool) – on errors raise exception
- execution_summary (bool) – print execution summary
- kwargs – keywords to pass to luigi.build
-
-
d6tflow.
invalidate_all
(confirm=False)[source]¶ Invalidate all tasks by deleting all files in data directory
Parameters: confirm (bool) – confirm operation
-
d6tflow.
invalidate_downstream
(task, task_downstream, confirm=False)[source]¶ Invalidate all downstream tasks in a flow.
For example, you have 3 dependant tasks. Normally you run Task3 but you’ve changed parameters for Task1. By invalidating Task3 it will check the full DAG and realize Task1 needs to be invalidated and therefore Task2 and Task3 also.
Parameters: - task (obj) – task to invalidate. This should be an downstream task for which you want to check downstream dependencies for invalidation conditions
- task_downstream (obj) – downstream task target
- confirm (bool) – confirm operation
-
d6tflow.
invalidate_orphans
(confirm=False)[source]¶ Invalidate all unused task outputs
Parameters: confirm (bool) – confirm operation
-
d6tflow.
invalidate_upstream
(task, confirm=False)[source]¶ Invalidate all tasks upstream tasks in a flow.
For example, you have 3 dependant tasks. Normally you run Task3 but you’ve changed parameters for Task1. By invalidating Task3 it will check the full DAG and realize Task1 needs to be invalidated and therefore Task2 and Task3 also.
Parameters: - task (obj) – task to invalidate. This should be an upstream task for which you want to check upstream dependencies for invalidation conditions
- confirm (bool) – confirm operation
-
d6tflow.
show
(task)[source]¶ Show task execution status
Parameters: tasks (obj, list) – task or list of tasks
d6tflow.tasks module¶
-
class
d6tflow.tasks.
TaskAggregator
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
Task which yields other tasks
NB: Use this function by implementing run() which should do nothing but yield other tasks
example:
class TaskCollector(d6tflow.tasks.TaskAggregator): def run(self): yield Task1() yield Task2()
-
complete
(cascade=True)[source]¶ If the task has any outputs, return
True
if all outputs exist. Otherwise, returnFalse
.However, you may freely override this method with custom logic.
-
output
()[source]¶ The output that this Task produces.
The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single
Target
or a list ofTarget
instances.- Implementation note
- If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.
See Task.output
-
-
class
d6tflow.tasks.
TaskCSVGZPandas
(*args, path=None, flows=None, **kwargs)[source]¶ Bases:
d6tflow.tasks.TaskData
Task which saves to CSV
-
target_class
¶ alias of
d6tflow.targets.CSVGZPandasTarget
-
target_ext
= 'csv.gz'¶
-
-
class
d6tflow.tasks.
TaskCSVPandas
(*args, path=None, flows=None, **kwargs)[source]¶ Bases:
d6tflow.tasks.TaskData
Task which saves to CSV
-
target_class
¶ alias of
d6tflow.targets.CSVPandasTarget
-
target_ext
= 'csv'¶
-
-
class
d6tflow.tasks.
TaskCache
(*args, path=None, flows=None, **kwargs)[source]¶ Bases:
d6tflow.tasks.TaskData
Task which saves to cache
-
target_class
¶ alias of
d6tflow.targets.CacheTarget
-
target_ext
= 'cache'¶
-
-
class
d6tflow.tasks.
TaskCachePandas
(*args, path=None, flows=None, **kwargs)[source]¶ Bases:
d6tflow.tasks.TaskData
Task which saves to cache pandas dataframes
-
target_class
¶ alias of
d6tflow.targets.PdCacheTarget
-
target_ext
= 'cache'¶
-
-
class
d6tflow.tasks.
TaskData
(*args, path=None, flows=None, **kwargs)[source]¶ Bases:
luigi.task.Task
Task which has data as input and output
Parameters: - target_class (obj) – target data format
- target_ext (str) – file extension
- persist (list) – list of string to identify data
- data (dict) – data container for all outputs
-
complete
(*args, **kwargs)¶
-
classmethod
get_param_values
(params, args, kwargs)[source]¶ Get the values of the parameters from the args and kwargs.
Parameters: - params – list of (param_name, Parameter).
- args – positional arguments
- kwargs – keyword arguments.
Returns: list of (name, value) tuples, one for each parameter.
-
get_pipename
(*args, **kwargs)¶
-
inputLoad
(keys=None, task=None, cached=False, as_dict=False)[source]¶ Load all or several outputs from task
Parameters: - keys (list) – list of data to load
- task (str) – if requires multiple tasks load that task ‘input1’ for eg def requires: {‘input1’:Task1(), ‘input2’:Task2()}
- cached (bool) – cache data in memory
- as_dict (bool) – if the inputs were saved as a dictionary. use this to return them as dictionary.
Returns: list or dict of all task output
-
metadata
= None¶
-
outputLoad
(keys=None, as_dict=False, cached=False)[source]¶ Load all or several outputs from task
Parameters: - keys (list) – list of data to load
- as_dict (bool) – cache data in memory
- cached (bool) – cache data in memory
Returns: list or dict of all task output
-
persist
= ['data']¶
-
save
(data, **kwargs)[source]¶ Persist data to target
Parameters: data (dict) – data to save. keys are the self.persist keys and values is data
-
target_class
¶ alias of
d6tflow.targets.DataTarget
-
target_ext
= 'ext'¶
-
class
d6tflow.tasks.
TaskExcelPandas
(*args, path=None, flows=None, **kwargs)[source]¶ Bases:
d6tflow.tasks.TaskData
Task which saves to Excel
-
target_class
¶ alias of
d6tflow.targets.ExcelPandasTarget
-
target_ext
= 'xlsx'¶
-
-
class
d6tflow.tasks.
TaskJson
(*args, path=None, flows=None, **kwargs)[source]¶ Bases:
d6tflow.tasks.TaskData
Task which saves to json
-
target_class
¶ alias of
d6tflow.targets.JsonTarget
-
target_ext
= 'json'¶
-
-
class
d6tflow.tasks.
TaskPickle
(*args, path=None, flows=None, **kwargs)[source]¶ Bases:
d6tflow.tasks.TaskData
Task which saves to pickle
-
target_class
¶ alias of
d6tflow.targets.PickleTarget
-
target_ext
= 'pkl'¶
-
-
class
d6tflow.tasks.
TaskPqPandas
(*args, path=None, flows=None, **kwargs)[source]¶ Bases:
d6tflow.tasks.TaskData
Task which saves to parquet
-
target_class
¶ alias of
d6tflow.targets.PqPandasTarget
-
target_ext
= 'parquet'¶
-
d6tflow.targets module¶
-
class
d6tflow.targets.
CSVGZPandasTarget
(path=None)[source]¶ Bases:
d6tflow.targets.CSVPandasTarget
Saves to CSV gzip, loads to pandas dataframe
-
save
(*args, **kwargs)¶
-
-
class
d6tflow.targets.
CSVPandasTarget
(path=None)[source]¶ Bases:
d6tflow.targets.DataTarget
Saves to CSV, loads to pandas dataframe
-
load
(cached=False, **kwargs)[source]¶ Load from csv to pandas dataframe
Parameters: - cached (bool) – keep data cached in memory
- **kwargs – arguments to pass to pd.read_csv
Returns: pandas dataframe
-
save
(*args, **kwargs)¶
-
-
class
d6tflow.targets.
CacheTarget
(path=None, format=None, is_tmp=False)[source]¶ Bases:
luigi.local_target.LocalTarget
Saves to in-memory cache, loads to python object
-
exists
()[source]¶ Returns
True
if the path for this FileSystemTarget exists;False
otherwise.This method is implemented by using
fs
.
-
save
(*args, **kwargs)¶
-
-
class
d6tflow.targets.
DataTarget
(path=None)[source]¶ Bases:
d6tflow.targets._LocalPathTarget
Local target which saves in-memory data (eg dataframes) to persistent storage (eg files) and loads from storage to memory
This is an abstract class that you should extend.
-
class
d6tflow.targets.
ExcelPandasTarget
(path=None)[source]¶ Bases:
d6tflow.targets.DataTarget
Saves to Excel, loads to pandas dataframe
-
load
(cached=False, **kwargs)[source]¶ Load from Excel to pandas dataframe
Parameters: - cached (bool) – keep data cached in memory
- **kwargs – arguments to pass to pd.read_csv
Returns: pandas dataframe
-
save
(*args, **kwargs)¶
-
-
class
d6tflow.targets.
JsonTarget
(path=None)[source]¶ Bases:
d6tflow.targets.DataTarget
Saves to json, loads to dict
-
load
(cached=False, **kwargs)[source]¶ Load from json to dict
Parameters: - cached (bool) – keep data cached in memory
- **kwargs – arguments to pass to json.load
Returns: dict
-
save
(*args, **kwargs)¶
-
-
class
d6tflow.targets.
PdCacheTarget
(path=None, format=None, is_tmp=False)[source]¶ Bases:
d6tflow.targets.CacheTarget
-
class
d6tflow.targets.
PickleTarget
(path=None)[source]¶ Bases:
d6tflow.targets.DataTarget
Saves to pickle, loads to python obj
-
load
(cached=False, **kwargs)[source]¶ Load from pickle to obj
Parameters: - cached (bool) – keep data cached in memory
- **kwargs – arguments to pass to pickle.load
Returns: dict
-
save
(*args, **kwargs)¶
-
-
class
d6tflow.targets.
PqPandasTarget
(path=None)[source]¶ Bases:
d6tflow.targets.DataTarget
Saves to parquet, loads to pandas dataframe
-
load
(cached=False, **kwargs)[source]¶ Load from parquet to pandas dataframe
Parameters: - cached (bool) – keep data cached in memory
- **kwargs – arguments to pass to pd.read_parquet
Returns: pandas dataframe
-
save
(*args, **kwargs)¶
-
d6tflow.pipes module¶
-
d6tflow.pipes.
all_pull_preview
(task, **kwargs)[source]¶ Pull preview for all upstream tasks in a flow
-
d6tflow.pipes.
all_push_preview
(task, **kwargs)[source]¶ Push preview for all upstream tasks in a flow
-
d6tflow.pipes.
get_dirpath
(name=None)[source]¶ Get a pipe directory as Pathlib.Path
Parameters: name (str) – name of pipe
-
d6tflow.pipes.
get_pipe
(name=None)[source]¶ Get a pipe
Parameters: name (str) – name of pipe Returns: pipe object Return type: obj
-
d6tflow.pipes.
init
(default_pipe_name, profile=None, local_pipe=False, local_api=False, reset=False, api=None, set_dir=True, api_args=None, pipe_args=None)[source]¶ Initialize d6tpipe
Parameters: - default_pipe_name (str) – name of pipe to store results. Override by setting Task.pipe attribute
- profile (str) – name of d6tpipe profile to get api if api not provided
- local_pipe (bool) – use PipeLocal()
- local_api (bool) – use APILocal()
- reset (bool) – reset api and pipe connection
- api (obj) – d6tpipe api object. if not provided will be loaded
- set_dir (bool) – if True, set d6tflow directory to default pipe directory
- api_args (dir) – arguments to pass to api
- pipe_args (dir) – arguments to pass to pipe
d6tflow.functional module¶
-
class
d6tflow.functional.
Workflow
[source]¶ Bases:
object
Functional Flow class that acts as a manager of all flow steps. Defines all the decorators that can be used on flow functions.
-
add_global_params
(**params)[source]¶ Adds params to flow functions. More like declares the params for further use. :param params: dictionary of param name and param type :type params: dict
Example
flow.add_params({‘multiplier’: d6tflow.IntParameter(default=0)})
-
delete
(func_to_reset, *args, **kwargs)[source]¶ Possibly dangerous! delete(func) will delete all files in the data/func directory of the given func. Useful if you want to delete all function related outputs. Consider using reset(func, params) to reset a specific func
-
deleteAll
(*args, **kwargs)[source]¶ Possibly dangerous! Will delete all files in the data/ directory of the functions attached to the workflow object. Useful if you want to delete all outputs even the once previously run. Consider using resetAll() if you want to only reset the functions with params you have run thus far
-
outputLoad
(func_to_run, *args, **kwargs)[source]¶ Loads all or several outputs from flow step.
Parameters: - func_to_run – flow step function
- keys (list) – list of data to load
- as_dict (bool) – cache data in memory
- cached (bool) – cache data in memory
Returns: list or dict of all task output
-
outputLoadAll
(func_to_run, *args, **kwargs)[source]¶ Loads all output from flow task and its parents.
Parameters: - func_to_run – flow step function
- keys (list) – list of data to load
- as_dict (bool) – cache data in memory
- cached (bool) – cache data in memory
Returns: list or dict of all task output
-
params
(*args, **kwargs)¶
-
persists
(*args, **kwargs)¶
-
requires
(*args, **kwargs)¶
-
reset
(func_to_reset, params=None, *args, **kwargs)[source]¶ Resets a particular function. Use with params to reset function with the given parameters. If params is not used, reset(func) will reset the function with all the parameters run thus far
-
resetAll
(*args, **kwargs)[source]¶ Resets all functions that are attached to the workflow object that have run at least once.
-
run
(*args, **kwargs)¶
-
task
(*args, **kwargs)¶
-