luigi.task module

The abstract Task class. It is a central concept of Luigi and represents the state of the workflow. See Tasks for an overview.

luigi.task.namespace(namespace=None)[source]

Call to set namespace of tasks declared after the call.

It is best practice to call this function without arguments at the end of any file it has been used in. That is to ensure that subsequent tasks have the default namespace again.

The namespace of a Task can also be changed by specifying the property task_namespace. This solution has the advantage that the namespace doesn’t have to be restored.

class Task2(luigi.Task):
    task_namespace = 'namespace2'

There’s no equivalent way to set the task_family.

luigi.task.task_id_str(task_family, params)[source]

Returns a canonical string used to identify a particular task

Parameters:
  • task_family – The task family (class name) of the task
  • params – a dict mapping parameter names to their serialized values
Returns:

A unique, shortened identifier corresponding to the family and params

exception luigi.task.BulkCompleteNotImplementedError[source]

Bases: exceptions.NotImplementedError

This is here to trick pylint.

pylint thinks anything raising NotImplementedError needs to be implemented in any subclass. bulk_complete isn’t like that. This tricks pylint into thinking that the default implementation is a valid implementation and no an abstract method.

class luigi.task.Task(*args, **kwargs)[source]

Bases: object

This is the base class of all Luigi Tasks, the base unit of work in Luigi.

A Luigi Task describes a unit or work.

The key methods of a Task, which must be implemented in a subclass are:

  • run() - the computation done by this task.
  • requires() - the list of Tasks that this Task depends on.
  • output() - the output Target that this Task creates.

Each Parameter of the Task should be declared as members:

class MyTask(luigi.Task):
    count = luigi.IntParameter()
    second_param = luigi.Parameter()

In addition to any declared properties and methods, there are a few non-declared properties, which are created by the Register metaclass:

priority = 0

Priority of the task: the scheduler should favor available tasks with higher priority values first. See Task priority

disabled = False
resources = {}

Resources used by the task. Should be formatted like {“scp”: 1} to indicate that the task requires 1 unit of the scp resource.

worker_timeout = None

Number of seconds after which to time out the run function. No timeout if set to 0. Defaults to 0 or worker-timeout value in config file Only works when using multiple workers.

max_batch_size = inf

Maximum number of tasks to run together as a batch. Infinite by default

batchable

True if this instance can be run as part of a batch. By default, True if it has any batched parameters

retry_count

Override this positive integer to have different retry_count at task level Check [scheduler]

disable_hard_timeout

Override this positive integer to have different disable_hard_timeout at task level. Check [scheduler]

disable_window_seconds

Override this positive integer to have different disable_window_seconds at task level. Check [scheduler]

owner_email

Override this to send out additional error emails to task owner, in addition to the one defined in the global configuration. This should return a string or a list of strings. e.g. 'test@exmaple.com‘ or ['test1@example.com‘, 'test2@example.com‘]

use_cmdline_section

Property used by core config such as –workers etc. These will be exposed without the class as prefix.

classmethod event_handler(event)[source]

Decorator for adding event handlers.

trigger_event(event, *args, **kwargs)[source]

Trigger that calls all of the specified events associated with this class.

task_module

Returns what Python module to import to get access to this class.

task_namespace = '__not_user_specified'

This value can be overriden to set the namespace that will be used. (See Namespaces, families and ids) If it’s not specified and you try to read this value anyway, it will return garbage. Please use get_task_namespace() to read the namespace.

Note that setting this value with @property will not work, because this is a class level value.

classmethod get_task_namespace()[source]

The task family for the given class.

Note: You normally don’t want to override this.

task_family = 'Task'
classmethod get_task_family()[source]

The task family for the given class.

If task_namespace is not set, then it’s simply the name of the class. Otherwise, <task_namespace>. is prefixed to the class name.

Note: You normally don’t want to override this.

classmethod get_params()[source]

Returns all of the Parameters for this Task.

classmethod batch_param_names()[source]
classmethod get_param_names(include_significant=False)[source]
classmethod get_param_values(params, args, kwargs)[source]

Get the values of the parameters from the args and kwargs.

Parameters:
  • params – list of (param_name, Parameter).
  • args – positional arguments
  • kwargs – keyword arguments.
Returns:

list of (name, value) tuples, one for each parameter.

initialized()[source]

Returns True if the Task is initialized and False otherwise.

classmethod from_str_params(params_str)[source]

Creates an instance from a str->str hash.

Parameters:params_str – dict of param name -> value as string.
to_str_params(only_significant=False)[source]

Convert all parameters to a str->str hash.

clone(cls=None, **kwargs)[source]

Creates a new instance from an existing instance where some of the args have changed.

There’s at least two scenarios where this is useful (see test/clone_test.py):

  • remove a lot of boiler plate when you have recursive dependencies and lots of args
  • there’s task inheritance and some logic is on the base class
Parameters:
  • cls
  • kwargs
Returns:

complete()[source]

If the task has any outputs, return True if all outputs exist. Otherwise, return False.

However, you may freely override this method with custom logic.

classmethod bulk_complete(parameter_tuples)[source]

Returns those of parameter_tuples for which this Task is complete.

Override (with an efficient implementation) for efficient scheduling with range tools. Keep the logic consistent with that of complete().

output()[source]

The output that this Task produces.

The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single Target or a list of Target instances.

Implementation note
If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.

See Task.output

requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a Subclasses can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

process_resources()[source]

Override in “template” tasks which provide common resource functionality but allow subclasses to specify additional resources while preserving the name for consistent end-user experience.

input()[source]

Returns the outputs of the Tasks returned by requires()

See Task.input

Returns:a list of Target objects which are specified as outputs of all required Tasks.
deps()[source]

Internal method used by the scheduler.

Returns the flattened list of requires.

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

on_failure(exception)[source]

Override for custom error handling.

This method gets called if an exception is raised in run(). The returned value of this method is json encoded and sent to the scheduler as the expl argument. Its string representation will be used as the body of the error email sent out if any.

Default behavior is to return a string representation of the stack trace.

on_success()[source]

Override for doing custom completion handling for a larger class of tasks

This method gets called when run() completes without raising any exceptions.

The returned value is json encoded and sent to the scheduler as the expl argument.

Default behavior is to send an None value

no_unpicklable_properties(*args, **kwds)[source]

Remove unpicklable properties before dump task and resume them after.

This method could be called in subtask’s dump method, to ensure unpicklable properties won’t break dump.

This method is a context-manager which can be called as below:

class luigi.task.MixinNaiveBulkComplete[source]

Bases: object

Enables a Task to be efficiently scheduled with e.g. range tools, by providing a bulk_complete implementation which checks completeness in a loop.

Applicable to tasks whose completeness checking is cheap.

This doesn’t exploit output location specific APIs for speed advantage, nevertheless removes redundant scheduler roundtrips.

classmethod bulk_complete(parameter_tuples)[source]
class luigi.task.ExternalTask(*args, **kwargs)[source]

Bases: luigi.task.Task

Subclass for references to external dependencies.

An ExternalTask’s does not have a run implementation, which signifies to the framework that this Task’s output() is generated outside of Luigi.

run = None
luigi.task.externalize(taskclass_or_taskobject)[source]

Returns an externalized version of a Task. You may both pass an instantiated task object or a task class. Some examples:

class RequiringTask(luigi.Task):
    def requires(self):
        task_object = self.clone(MyTask)
        return externalize(task_object)

    ...

Here’s mostly equivalent code, but externalize is applied to a task class instead.

@luigi.util.requires(externalize(MyTask))
class RequiringTask(luigi.Task):
    pass
    ...

Of course, it may also be used directly on classes and objects (for example for reexporting or other usage).

MyTask = externalize(MyTask)
my_task_2 = externalize(MyTask2(param='foo'))

If you however want a task class to be external from the beginning, you’re better off inheriting ExternalTask rather than Task.

This function tries to be side-effect free by creating a copy of the class or the object passed in and then modify that object. In particular this code shouldn’t do anything.

externalize(MyTask)  # BAD: This does nothing (as after luigi 2.4.0)
class luigi.task.WrapperTask(*args, **kwargs)[source]

Bases: luigi.task.Task

Use for tasks that only wrap other tasks and that by definition are done if all their requirements exist.

complete()[source]
class luigi.task.Config(*args, **kwargs)[source]

Bases: luigi.task.Task

Class for configuration. See Configuration classes.

luigi.task.getpaths(struct)[source]

Maps all Tasks in a structured data object to their .output().

luigi.task.flatten(struct)[source]

Creates a flat list of all all items in structured output (dicts, lists, items):

>>> sorted(flatten({'a': 'foo', 'b': 'bar'}))
['bar', 'foo']
>>> sorted(flatten(['foo', ['bar', 'troll']]))
['bar', 'foo', 'troll']
>>> flatten('foo')
['foo']
>>> flatten(42)
[42]
luigi.task.flatten_output(task)[source]

Lists all output targets by recursively walking output-less (wrapper) tasks.

FIXME order consistently.