Source code for

#!/usr/bin/env python

# Finds all tasks and task outputs on the dependency paths from the given downstream task T
# up to the given source/upstream task S (optional). If the upstream task is not given,
# all upstream tasks on all dependency paths of T will be returned.

# Terms:
# if  the execution of Task T depends on the output of task S on a dependency graph,
#  T is called a downstream/sink task, S is called an upstream/source task.

# This is useful and practical way to find all upstream tasks of task T.
# For example suppose you have a daily computation that starts with a task named Daily.
# And suppose you have another task named Aggregate. Daily triggers a few tasks
# which eventually trigger Aggregate. Now, suppose you find a bug in Aggregate.
# You fixed the bug and now you want to rerun it, including all it's upstream deps.
# To do that you run:
#      bin/ --module daily_module Aggregate --daily-param1 xxx --upstream-family Daily
# This will output all the tasks on the dependency path between Daily and Aggregate. In
# effect, this is how you find all upstream tasks for Aggregate. Now you can delete its
# output and run Aggregate again. Daily will eventually trigget Aggregate and all tasks on
# the way.
# The same code here might be used as a CLI tool as well as a python module.
# In python, invoke find_deps(task, upstream_name) to get a set of all task instances on the
# paths between task T and upstream task S. You can then use the task instances to delete their output or
# perform other computation based on that.
# Example:
# PYTHONPATH=$PYTHONPATH:/path/to/your/luigi/tasks bin/ \
# --module my.tasks  MyDownstreamTask
# --downstream_task_param1 123456
# [--upstream-family MyUpstreamTask]

from __future__ import print_function
import luigi.interface
from luigi.contrib.ssh import RemoteTarget
from luigi.contrib.postgres import PostgresTarget
from luigi.contrib.s3 import S3Target
from import FileSystemTarget
from luigi.task import flatten
from luigi import parameter
import sys
from luigi.cmdline_parser import CmdlineParser
    from import Iterable
except ImportError:
    from collections import Iterable

[docs]def get_task_requires(task): return set(flatten(task.requires()))
[docs]def dfs_paths(start_task, goal_task_family, path=None): if path is None: path = [start_task] if start_task.task_family == goal_task_family or goal_task_family is None: for item in path: yield item for next in get_task_requires(start_task) - set(path): for t in dfs_paths(next, goal_task_family, path + [next]): yield t
[docs]class upstream(luigi.task.Config): ''' Used to provide the parameter upstream-family ''' family = parameter.OptionalParameter(default=None)
[docs]def find_deps(task, upstream_task_family): ''' Finds all dependencies that start with the given task and have a path to upstream_task_family Returns all deps on all paths between task and upstream ''' return {t for t in dfs_paths(task, upstream_task_family)}
[docs]def find_deps_cli(): ''' Finds all tasks on all paths from provided CLI task ''' cmdline_args = sys.argv[1:] with CmdlineParser.global_instance(cmdline_args) as cp: return find_deps(cp.get_task_obj(), upstream().family)
[docs]def get_task_output_description(task_output): ''' Returns a task's output as a string ''' output_description = "n/a" if isinstance(task_output, RemoteTarget): output_description = "[SSH] {0}:{1}".format(, task_output.path) elif isinstance(task_output, S3Target): output_description = "[S3] {0}".format(task_output.path) elif isinstance(task_output, FileSystemTarget): output_description = "[FileSystem] {0}".format(task_output.path) elif isinstance(task_output, PostgresTarget): output_description = "[DB] {0}:{1}".format(, task_output.table) else: output_description = "to be determined" return output_description
[docs]def main(): deps = find_deps_cli() for task in deps: task_output = task.output() if isinstance(task_output, dict): output_descriptions = [get_task_output_description(output) for label, output in task_output.items()] elif isinstance(task_output, Iterable): output_descriptions = [get_task_output_description(output) for output in task_output] else: output_descriptions = [get_task_output_description(task_output)] print(" TASK: {0}".format(task)) for desc in output_descriptions: print(" : {0}".format(desc))
if __name__ == '__main__': main()