luigi.task module

The abstract Task class. It is a central concept of Luigi and represents the state of the workflow. See Tasks for an overview.

luigi.task.namespace(namespace=None, scope='')[source]

Call to set namespace of tasks declared after the call.

It is often desired to call this function with the keyword argument scope=__name__.

The scope keyword makes it so that this call is only effective for task classes with a matching [*] __module__. The default value for scope is the empty string, which means all classes. Multiple calls with the same scope simply replace each other.

The namespace of a Task can also be changed by specifying the property task_namespace.

class Task2(luigi.Task):
    task_namespace = 'namespace2'

This explicit setting takes priority over whatever is set in the namespace() method, and it’s also inherited through normal python inheritence.

There’s no equivalent way to set the task_family.

New since Luigi 2.6.0: scope keyword argument.

See also

The new and better scaling auto_namespace()

luigi.task.auto_namespace(scope='')[source]

Same as namespace(), but instead of a constant namespace, it will be set to the __module__ of the task class. This is desirable for these reasons:

  • Two tasks with the same name will not have conflicting task families

  • It’s more pythonic, as modules are Python’s recommended way to do namespacing.

  • It’s traceable. When you see the full name of a task, you can immediately identify where it is defined.

We recommend calling this function from your package’s outermost __init__.py file. The file contents could look like this:

import luigi

luigi.auto_namespace(scope=__name__)

To reset an auto_namespace() call, you can use namespace(scope='my_scope'). But this will not be needed (and is also discouraged) if you use the scope kwarg.

New since Luigi 2.6.0.

luigi.task.task_id_str(task_family, params)[source]

Returns a canonical string used to identify a particular task

Parameters:
  • task_family – The task family (class name) of the task

  • params – a dict mapping parameter names to their serialized values

Returns:

A unique, shortened identifier corresponding to the family and params

exception luigi.task.BulkCompleteNotImplementedError[source]

Bases: NotImplementedError

This is here to trick pylint.

pylint thinks anything raising NotImplementedError needs to be implemented in any subclass. bulk_complete isn’t like that. This tricks pylint into thinking that the default implementation is a valid implementation and not an abstract method.

class luigi.task.Task(*args, **kwargs)[source]

Bases: object

This is the base class of all Luigi Tasks, the base unit of work in Luigi.

A Luigi Task describes a unit or work.

The key methods of a Task, which must be implemented in a subclass are:

  • run() - the computation done by this task.

  • requires() - the list of Tasks that this Task depends on.

  • output() - the output Target that this Task creates.

Each Parameter of the Task should be declared as members:

class MyTask(luigi.Task):
    count = luigi.IntParameter()
    second_param = luigi.Parameter()

In addition to any declared properties and methods, there are a few non-declared properties, which are created by the Register metaclass:

priority = 0

Priority of the task: the scheduler should favor available tasks with higher priority values first. See Task priority

disabled = False
resources = {}

Resources used by the task. Should be formatted like {“scp”: 1} to indicate that the task requires 1 unit of the scp resource.

worker_timeout = None

Number of seconds after which to time out the run function. No timeout if set to 0. Defaults to 0 or worker-timeout value in config

max_batch_size = inf

Maximum number of tasks to run together as a batch. Infinite by default

property batchable

True if this instance can be run as part of a batch. By default, True if it has any batched parameters

property retry_count

Override this positive integer to have different retry_count at task level Check [scheduler]

property disable_hard_timeout

Override this positive integer to have different disable_hard_timeout at task level. Check [scheduler]

property disable_window

Override this positive integer to have different disable_window at task level. Check [scheduler]

property disable_window_seconds
property owner_email

Override this to send out additional error emails to task owner, in addition to the one defined in the global configuration. This should return a string or a list of strings. e.g. ‘test@exmaple.com’ or [‘test1@example.com’, ‘test2@example.com’]

property use_cmdline_section

Property used by core config such as –workers etc. These will be exposed without the class as prefix.

classmethod event_handler(event)[source]

Decorator for adding event handlers.

trigger_event(event, *args, **kwargs)[source]

Trigger that calls all of the specified events associated with this class.

property accepts_messages

For configuring which scheduler messages can be received. When falsy, this tasks does not accept any message. When True, all messages are accepted.

property task_module

Returns what Python module to import to get access to this class.

task_namespace = '__not_user_specified'

This value can be overridden to set the namespace that will be used. (See Namespaces, families and ids) If it’s not specified and you try to read this value anyway, it will return garbage. Please use get_task_namespace() to read the namespace.

Note that setting this value with @property will not work, because this is a class level value.

classmethod get_task_namespace()[source]

The task family for the given class.

Note: You normally don’t want to override this.

task_family = 'Task'
classmethod get_task_family()[source]

The task family for the given class.

If task_namespace is not set, then it’s simply the name of the class. Otherwise, <task_namespace>. is prefixed to the class name.

Note: You normally don’t want to override this.

classmethod get_params()[source]

Returns all of the Parameters for this Task.

classmethod batch_param_names()[source]
classmethod get_param_names(include_significant=False)[source]
classmethod get_param_values(params, args, kwargs)[source]

Get the values of the parameters from the args and kwargs.

Parameters:
  • params – list of (param_name, Parameter).

  • args – positional arguments

  • kwargs – keyword arguments.

Returns:

list of (name, value) tuples, one for each parameter.

property param_args
initialized()[source]

Returns True if the Task is initialized and False otherwise.

classmethod from_str_params(params_str)[source]

Creates an instance from a str->str hash.

Parameters:

params_str – dict of param name -> value as string.

to_str_params(only_significant=False, only_public=False)[source]

Convert all parameters to a str->str hash.

clone(cls=None, **kwargs)[source]

Creates a new instance from an existing instance where some of the args have changed.

There’s at least two scenarios where this is useful (see test/clone_test.py):

  • remove a lot of boiler plate when you have recursive dependencies and lots of args

  • there’s task inheritance and some logic is on the base class

Parameters:
  • cls

  • kwargs

Returns:

complete()[source]

If the task has any outputs, return True if all outputs exist. Otherwise, return False.

However, you may freely override this method with custom logic.

classmethod bulk_complete(parameter_tuples)[source]

Returns those of parameter_tuples for which this Task is complete.

Override (with an efficient implementation) for efficient scheduling with range tools. Keep the logic consistent with that of complete().

output()[source]

The output that this Task produces.

The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single Target or a list of Target instances.

Implementation note

If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.

See Task.output

requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

process_resources()[source]

Override in “template” tasks which provide common resource functionality but allow subclasses to specify additional resources while preserving the name for consistent end-user experience.

input()[source]

Returns the outputs of the Tasks returned by requires()

See Task.input

Returns:

a list of Target objects which are specified as outputs of all required Tasks.

deps()[source]

Internal method used by the scheduler.

Returns the flattened list of requires.

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

on_failure(exception)[source]

Override for custom error handling.

This method gets called if an exception is raised in run(). The returned value of this method is json encoded and sent to the scheduler as the expl argument. Its string representation will be used as the body of the error email sent out if any.

Default behavior is to return a string representation of the stack trace.

on_success()[source]

Override for doing custom completion handling for a larger class of tasks

This method gets called when run() completes without raising any exceptions.

The returned value is json encoded and sent to the scheduler as the expl argument.

Default behavior is to send an None value

no_unpicklable_properties()[source]

Remove unpicklable properties before dump task and resume them after.

This method could be called in subtask’s dump method, to ensure unpicklable properties won’t break dump.

This method is a context-manager which can be called as below:

class luigi.task.MixinNaiveBulkComplete[source]

Bases: object

Enables a Task to be efficiently scheduled with e.g. range tools, by providing a bulk_complete implementation which checks completeness in a loop.

Applicable to tasks whose completeness checking is cheap.

This doesn’t exploit output location specific APIs for speed advantage, nevertheless removes redundant scheduler roundtrips.

classmethod bulk_complete(parameter_tuples)[source]
class luigi.task.DynamicRequirements(requirements, custom_complete=None)[source]

Bases: object

Wraps dynamic requirements yielded in tasks’s run methods to control how completeness checks of (e.g.) large chunks of tasks are performed. Besides the wrapped requirements, instances of this class can be passed an optional function custom_complete that might implement an optimized check for completeness. If set, the function will be called with a single argument, complete_fn, which should be used to perform the per-task check. Example:

class SomeTaskWithDynamicRequirements(luigi.Task):
    ...

    def run(self):
        large_chunk_of_tasks = [OtherTask(i=i) for i in range(10000)]

        def custom_complete(complete_fn):
            # example: assume OtherTask always write into the same directory, so just check
            #          if the first task is complete, and compare basenames for the rest
            if not complete_fn(large_chunk_of_tasks[0]):
                return False
            paths = [task.output().path for task in large_chunk_of_tasks]
            basenames = os.listdir(os.path.dirname(paths[0]))  # a single fs call
            return all(os.path.basename(path) in basenames for path in paths)

        yield DynamicRequirements(large_chunk_of_tasks, custom_complete)
requirements

The original, wrapped requirements.

flat_requirements

Flattened view of the wrapped requirements (via flatten()). Read only.

paths

Outputs of the requirements in the identical structure (via getpaths()). Read only.

custom_complete

The optional, custom function performing the completeness check of the wrapped requirements.

property flat_requirements
property paths
complete(complete_fn=None)[source]
class luigi.task.ExternalTask(*args, **kwargs)[source]

Bases: Task

Subclass for references to external dependencies.

An ExternalTask’s does not have a run implementation, which signifies to the framework that this Task’s output() is generated outside of Luigi.

run = None
luigi.task.externalize(taskclass_or_taskobject)[source]

Returns an externalized version of a Task. You may both pass an instantiated task object or a task class. Some examples:

class RequiringTask(luigi.Task):
    def requires(self):
        task_object = self.clone(MyTask)
        return externalize(task_object)

    ...

Here’s mostly equivalent code, but externalize is applied to a task class instead.

@luigi.util.requires(externalize(MyTask))
class RequiringTask(luigi.Task):
    pass
    ...

Of course, it may also be used directly on classes and objects (for example for reexporting or other usage).

MyTask = externalize(MyTask)
my_task_2 = externalize(MyTask2(param='foo'))

If you however want a task class to be external from the beginning, you’re better off inheriting ExternalTask rather than Task.

This function tries to be side-effect free by creating a copy of the class or the object passed in and then modify that object. In particular this code shouldn’t do anything.

externalize(MyTask)  # BAD: This does nothing (as after luigi 2.4.0)
class luigi.task.WrapperTask(*args, **kwargs)[source]

Bases: Task

Use for tasks that only wrap other tasks and that by definition are done if all their requirements exist.

complete()[source]

If the task has any outputs, return True if all outputs exist. Otherwise, return False.

However, you may freely override this method with custom logic.

class luigi.task.Config(*args, **kwargs)[source]

Bases: Task

Class for configuration. See Configuration classes.

luigi.task.getpaths(struct)[source]

Maps all Tasks in a structured data object to their .output().

luigi.task.flatten(struct)[source]

Creates a flat list of all items in structured output (dicts, lists, items):

>>> sorted(flatten({'a': 'foo', 'b': 'bar'}))
['bar', 'foo']
>>> sorted(flatten(['foo', ['bar', 'troll']]))
['bar', 'foo', 'troll']
>>> flatten('foo')
['foo']
>>> flatten(42)
[42]
luigi.task.flatten_output(task)[source]

Lists all output targets by recursively walking output-less (wrapper) tasks.