luigi.util module

Using inherits and requires to ease parameter pain

Most luigi plumbers will find themselves in an awkward task parameter situation at some point or another. Consider the following “parameter explosion” problem:

class TaskA(luigi.ExternalTask):
    param_a = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('/tmp/log-{t.param_a}'.format(t=self))

class TaskB(luigi.Task):
    param_b = luigi.Parameter()
    param_a = luigi.Parameter()

    def requires(self):
        return TaskA(param_a=self.param_a)

class TaskC(luigi.Task):
    param_c = luigi.Parameter()
    param_b = luigi.Parameter()
    param_a = luigi.Parameter()

    def requires(self):
        return TaskB(param_b=self.param_b, param_a=self.param_a)

In work flows requiring many tasks to be chained together in this manner, parameter handling can spiral out of control. Each downstream task becomes more burdensome than the last. Refactoring becomes more difficult. There are several ways one might try and avoid the problem.

Approach 1: Parameters via command line or config instead of requires.

class TaskA(luigi.ExternalTask):
    param_a = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('/tmp/log-{t.param_a}'.format(t=self))

class TaskB(luigi.Task):
    param_b = luigi.Parameter()

    def requires(self):
        return TaskA()

class TaskC(luigi.Task):
    param_c = luigi.Parameter()

    def requires(self):
        return TaskB()

Then run in the shell like so:

luigi --module my_tasks TaskC --param-c foo --TaskB-param-b bar --TaskA-param-a baz

Repetitive parameters have been eliminated, but at the cost of making the job’s command line interface slightly clunkier. Often this is a reasonable trade-off.

But parameters can’t always be refactored out every class. Downstream tasks might also need to use some of those parameters. For example, if TaskC needs to use param_a too, then param_a would still need to be repeated.

Approach 2: Use a common parameter class

class Params(luigi.Config):
    param_c = luigi.Parameter()
    param_b = luigi.Parameter()
    param_a = luigi.Parameter()

class TaskA(Params, luigi.ExternalTask):
    def output(self):
        return luigi.LocalTarget('/tmp/log-{t.param_a}'.format(t=self))

class TaskB(Params):
    def requires(self):
        return TaskA()

class TaskB(Params):
    def requires(self):
        return TaskB()

This looks great at first glance, but a couple of issues lurk. Now TaskA and TaskB have unnecessary significant parameters. Significant parameters help define the identity of a task. Identical tasks are prevented from running at the same time by the central planner. This helps preserve the idempotent and atomic nature of luigi tasks. Unnecessary significant task parameters confuse a task’s identity. Under the right circumstances, task identity confusion could lead to that task running when it shouldn’t, or failing to run when it should.

This approach should only be used when all of the parameters of the config class, are significant (or all insignificant) for all of its subclasses.

And wait a second... there’s a bug in the above code. See it?

TaskA won’t behave as an ExternalTask because the parent classes are specified in the wrong order. This contrived example is easy to fix (by swapping the ordering of the parents of TaskA), but real world cases can be more difficult to both spot and fix. Inheriting from multiple classes derived from luigi.Task should be undertaken with caution and avoided where possible.

Approach 3: Use inherits and requires

The inherits class decorator in this module copies parameters (and nothing else) from one task class to another, and avoids direct pythonic inheritance.

import luigi
from luigi.util import inherits

class TaskA(luigi.ExternalTask):
    param_a = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('/tmp/log-{t.param_a}'.format(t=self))

@inherits(TaskA)
class TaskB(luigi.Task):
    param_b = luigi.Parameter()

    def requires(self):
        t = self.clone(TaskA)  # or t = self.clone_parent()

        # Wait... whats this clone thingy do?
        #
        # Pass it a task class.  It calls that task.  And when it does, it
        # supplies all parameters (and only those parameters) common to
        # the caller and callee!
        #
        # The call to clone is equivalent to the following (note the
        # fact that clone avoids passing param_b).
        #
        #   return TaskA(param_a=self.param_a)

        return t

@inherits(TaskB)
class TaskC(luigi.Task):
    param_c = luigi.Parameter()

    def requires(self):
        return self.clone(TaskB)

This totally eliminates the need to repeat parameters, avoids inheritance issues, and keeps the task command line interface as simple (as it can be, anyway). Refactoring task parameters is also much easier.

The requires helper function can reduce this pattern even further. It does everything inherits does, and also attaches a requires method to your task (still all without pythonic inheritance).

But how does it know how to invoke the upstream task? It uses clone behind the scenes!

import luigi
from luigi.util import inherits, requires

class TaskA(luigi.ExternalTask):
    param_a = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('/tmp/log-{t.param_a}'.format(t=self))

@requires(TaskA)
class TaskB(luigi.Task):
    param_b = luigi.Parameter()

    # The class decorator does this for me!
    # def requires(self):
    #     return self.clone(TaskA)

Use these helper functions effectively to avoid unnecessary repetition and dodge a few potentially nasty workflow pitfalls at the same time. Brilliant!

luigi.util.common_params(task_instance, task_cls)[source]

Grab all the values in task_instance that are found in task_cls.

class luigi.util.inherits(task_to_inherit)[source]

Bases: object

Task inheritance.

Usage:

class AnotherTask(luigi.Task):
    n = luigi.IntParameter()
    # ...

@inherits(AnotherTask):
class MyTask(luigi.Task):
    def requires(self):
       return self.clone_parent()

    def run(self):
       print self.n # this will be defined
       # ...
class luigi.util.requires(task_to_require)[source]

Bases: object

Same as @inherits, but also auto-defines the requires method.

class luigi.util.copies(task_to_copy)[source]

Bases: object

Auto-copies a task.

Usage:

@copies(MyTask):
class CopyOfMyTask(luigi.Task):
    def output(self):
       return LocalTarget(self.date.strftime('/var/xyz/report-%Y-%m-%d'))
luigi.util.delegates(task_that_delegates)[source]

Lets a task call methods on subtask(s).

The way this works is that the subtask is run as a part of the task, but the task itself doesn’t have to care about the requirements of the subtasks. The subtask doesn’t exist from the scheduler’s point of view, and its dependencies are instead required by the main task.

Example:

class PowersOfN(luigi.Task):
    n = luigi.IntParameter()
    def f(self, x): return x ** self.n

@delegates
class T(luigi.Task):
    def subtasks(self): return PowersOfN(5)
    def run(self): print self.subtasks().f(42)
luigi.util.previous(task)[source]

Return a previous Task of the same family.

By default checks if this task family only has one non-global parameter and if it is a DateParameter, DateHourParameter or DateIntervalParameter in which case it returns with the time decremented by 1 (hour, day or interval)

luigi.util.get_previous_completed(task, max_steps=10)[source]