luigi.tools.range

Produces contiguous completed ranges of recurring tasks.

See RangeDaily and RangeHourly for basic usage.

Caveat - if gaps accumulate, their causes (e.g. missing dependencies) going unmonitored/unmitigated, then this will eventually keep retrying the same gaps over and over and make no progress to more recent times. (See task_limit and reverse parameters.) TODO foolproof against that kind of misuse?

Functions

infer_bulk_complete_from_fs(datetimes, ...)

Efficiently determines missing datetimes by filesystem listing.

most_common(items)

Classes

RangeBase(*args, **kwargs)

Produces a contiguous completed range of a recurring task.

RangeByMinutes(*args, **kwargs)

Efficiently produces a contiguous completed range of an recurring task every interval minutes that takes a single DateMinuteParameter.

RangeByMinutesBase(*args, **kwargs)

Produces a contiguous completed range of an recurring tasks separated a specified number of minutes.

RangeDaily(*args, **kwargs)

Efficiently produces a contiguous completed range of a daily recurring task that takes a single DateParameter.

RangeDailyBase(*args, **kwargs)

Produces a contiguous completed range of a daily recurring task.

RangeEvent()

Events communicating useful metrics.

RangeHourly(*args, **kwargs)

Efficiently produces a contiguous completed range of an hourly recurring task that takes a single DateHourParameter.

RangeHourlyBase(*args, **kwargs)

Produces a contiguous completed range of an hourly recurring task.

RangeMonthly(*args, **kwargs)

Produces a contiguous completed range of a monthly recurring task.

class luigi.tools.range.RangeEvent[source]

Events communicating useful metrics.

COMPLETE_COUNT would normally be nondecreasing, and its derivative would describe performance (how many instances complete invocation-over-invocation).

COMPLETE_FRACTION reaching 1 would be a telling event in case of a backfill with defined start and stop. Would not be strikingly useful for a typical recurring task without stop defined, fluctuating close to 1.

DELAY is measured from the first found missing datehour till (current time + hours_forward), or till stop if it is defined. In hours for Hourly. TBD different units for other frequencies? TODO any different for reverse mode? From first missing till last missing? From last gap till stop?

COMPLETE_COUNT = 'event.tools.range.complete.count'
COMPLETE_FRACTION = 'event.tools.range.complete.fraction'
DELAY = 'event.tools.range.delay'
class luigi.tools.range.RangeBase(*args, **kwargs)[source]

Produces a contiguous completed range of a recurring task.

Made for the common use case where a task is parameterized by e.g. DateParameter, and assurance is needed that any gaps arising from downtime are eventually filled.

Emits events that one can use to monitor gaps and delays.

At least one of start and stop needs to be specified.

(This is quite an abstract base class for subclasses with different datetime parameter classes, e.g. DateParameter, DateHourParameter, …, and different parameter naming, e.g. days_back/forward, hours_back/forward, …, as well as different documentation wording, to improve user experience.)

Subclasses will need to use the of parameter when overriding methods.

of

A parameter that takes another luigi task class.

When used programatically, the parameter should be specified directly with the luigi.task.Task (sub) class. Like MyMetaTask(my_task_param=my_tasks.MyTask). On the command line, you specify the luigi.task.Task.get_task_family(). Like

$ luigi --module my_tasks MyMetaTask --my_task_param my_namespace.MyTask

Where my_namespace.MyTask is defined in the my_tasks python module.

When the luigi.task.Task class is instantiated to an object. The value will always be a task class (and not a string).

of_params

Parameter whose value is a dict.

In the task definition, use

class MyTask(luigi.Task):
  tags = luigi.DictParameter()

    def run(self):
        logging.info("Find server with role: %s", self.tags['role'])
        server = aws.ec2.find_my_resource(self.tags)

At the command line, use

$ luigi --module my_tasks MyTask --tags <JSON string>

Simple example with two tags:

$ luigi --module my_tasks MyTask --tags '{"role": "web", "env": "staging"}'

It can be used to define dynamic parameters, when you do not know the exact list of your parameters (e.g. list of tags, that are dynamically constructed outside Luigi), or you have a complex parameter containing logically related values (like a database connection config).

It is possible to provide a JSON schema that should be validated by the given value:

class MyTask(luigi.Task):
  tags = luigi.DictParameter(
    schema={
      "type": "object",
      "patternProperties": {
        ".*": {"type": "string", "enum": ["web", "staging"]},
      }
    }
  )

  def run(self):
    logging.info("Find server with role: %s", self.tags['role'])
    server = aws.ec2.find_my_resource(self.tags)

Using this schema, the following command will work:

$ luigi --module my_tasks MyTask --tags '{"role": "web", "env": "staging"}'

while this command will fail because the parameter is not valid:

$ luigi --module my_tasks MyTask --tags '{"role": "UNKNOWN_VALUE", "env": "staging"}'

Finally, the provided schema can be a custom validator:

custom_validator = jsonschema.Draft4Validator(
  schema={
    "type": "object",
    "patternProperties": {
      ".*": {"type": "string", "enum": ["web", "staging"]},
    }
  }
)

class MyTask(luigi.Task):
  tags = luigi.DictParameter(schema=custom_validator)

  def run(self):
    logging.info("Find server with role: %s", self.tags['role'])
    server = aws.ec2.find_my_resource(self.tags)
start

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See Parameters from config Ingestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

stop

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See Parameters from config Ingestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

reverse

A Parameter whose value is a bool. This parameter has an implicit default value of False. For the command line interface this means that the value is False unless you add "--the-bool-parameter" to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to be True. This is called explicit parsing. When omitting the parameter value, it is still considered True but to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.

You can toggle between the two parsing modes on a per-parameter base via

class MyTask(luigi.Task):
    implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING)
    explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)

or globally by

luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING

for all bool parameters instantiated after this line.

task_limit

Parameter whose value is an int.

now

Parameter whose value is an int.

param_name

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See Parameters from config Ingestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

property of_cls

DONT USE. Will be deleted soon. Use self.of!

datetime_to_parameter(dt)[source]
parameter_to_datetime(p)[source]
datetime_to_parameters(dt)[source]

Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter

parameters_to_datetime(p)[source]

Given a dictionary of parameters, will extract the ranged task parameter value

moving_start(now)[source]

Returns a datetime from which to ensure contiguousness in the case when start is None or unfeasibly far back.

moving_stop(now)[source]

Returns a datetime till which to ensure contiguousness in the case when stop is None or unfeasibly far forward.

finite_datetimes(finite_start, finite_stop)[source]

Returns the individual datetimes in interval [finite_start, finite_stop) for which task completeness should be required, as a sorted list.

requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

missing_datetimes(finite_datetimes)[source]

Override in subclasses to do bulk checks.

Returns a sorted list.

This is a conservative base implementation that brutally checks completeness, instance by instance.

Inadvisable as it may be slow.

class luigi.tools.range.RangeDailyBase(*args, **kwargs)[source]

Produces a contiguous completed range of a daily recurring task.

start

Parameter whose value is a date.

A DateParameter is a Date string formatted YYYY-MM-DD. For example, 2013-07-10 specifies July 10, 2013.

DateParameters are 90% of the time used to be interpolated into file system paths or the like. Here is a gentle reminder of how to interpolate date parameters into strings:

class MyTask(luigi.Task):
    date = luigi.DateParameter()

    def run(self):
        templated_path = "/my/path/to/my/dataset/{date:%Y/%m/%d}/"
        instantiated_path = templated_path.format(date=self.date)
        # print(instantiated_path) --> /my/path/to/my/dataset/2016/06/09/
        # ... use instantiated_path ...

To set this parameter to default to the current day. You can write code like this:

import datetime

class MyTask(luigi.Task):
    date = luigi.DateParameter(default=datetime.date.today())
stop

Parameter whose value is a date.

A DateParameter is a Date string formatted YYYY-MM-DD. For example, 2013-07-10 specifies July 10, 2013.

DateParameters are 90% of the time used to be interpolated into file system paths or the like. Here is a gentle reminder of how to interpolate date parameters into strings:

class MyTask(luigi.Task):
    date = luigi.DateParameter()

    def run(self):
        templated_path = "/my/path/to/my/dataset/{date:%Y/%m/%d}/"
        instantiated_path = templated_path.format(date=self.date)
        # print(instantiated_path) --> /my/path/to/my/dataset/2016/06/09/
        # ... use instantiated_path ...

To set this parameter to default to the current day. You can write code like this:

import datetime

class MyTask(luigi.Task):
    date = luigi.DateParameter(default=datetime.date.today())
days_back

Parameter whose value is an int.

days_forward

Parameter whose value is an int.

datetime_to_parameter(dt)[source]
parameter_to_datetime(p)[source]
datetime_to_parameters(dt)[source]

Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter

parameters_to_datetime(p)[source]

Given a dictionary of parameters, will extract the ranged task parameter value

moving_start(now)[source]

Returns a datetime from which to ensure contiguousness in the case when start is None or unfeasibly far back.

moving_stop(now)[source]

Returns a datetime till which to ensure contiguousness in the case when stop is None or unfeasibly far forward.

finite_datetimes(finite_start, finite_stop)[source]

Simply returns the points in time that correspond to turn of day.

class luigi.tools.range.RangeHourlyBase(*args, **kwargs)[source]

Produces a contiguous completed range of an hourly recurring task.

start

Parameter whose value is a datetime specified to the hour.

A DateHourParameter is a ISO 8601 formatted date and time specified to the hour. For example, 2013-07-10T19 specifies July 10, 2013 at 19:00.

stop

Parameter whose value is a datetime specified to the hour.

A DateHourParameter is a ISO 8601 formatted date and time specified to the hour. For example, 2013-07-10T19 specifies July 10, 2013 at 19:00.

hours_back

Parameter whose value is an int.

hours_forward

Parameter whose value is an int.

datetime_to_parameter(dt)[source]
parameter_to_datetime(p)[source]
datetime_to_parameters(dt)[source]

Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter

parameters_to_datetime(p)[source]

Given a dictionary of parameters, will extract the ranged task parameter value

moving_start(now)[source]

Returns a datetime from which to ensure contiguousness in the case when start is None or unfeasibly far back.

moving_stop(now)[source]

Returns a datetime till which to ensure contiguousness in the case when stop is None or unfeasibly far forward.

finite_datetimes(finite_start, finite_stop)[source]

Simply returns the points in time that correspond to whole hours.

class luigi.tools.range.RangeByMinutesBase(*args, **kwargs)[source]

Produces a contiguous completed range of an recurring tasks separated a specified number of minutes.

start

Parameter whose value is a datetime specified to the minute.

A DateMinuteParameter is a ISO 8601 formatted date and time specified to the minute. For example, 2013-07-10T1907 specifies July 10, 2013 at 19:07.

The interval parameter can be used to clamp this parameter to every N minutes, instead of every minute.

stop

Parameter whose value is a datetime specified to the minute.

A DateMinuteParameter is a ISO 8601 formatted date and time specified to the minute. For example, 2013-07-10T1907 specifies July 10, 2013 at 19:07.

The interval parameter can be used to clamp this parameter to every N minutes, instead of every minute.

minutes_back

Parameter whose value is an int.

minutes_forward

Parameter whose value is an int.

minutes_interval

Parameter whose value is an int.

datetime_to_parameter(dt)[source]
parameter_to_datetime(p)[source]
datetime_to_parameters(dt)[source]

Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter

parameters_to_datetime(p)[source]

Given a dictionary of parameters, will extract the ranged task parameter value

moving_start(now)[source]

Returns a datetime from which to ensure contiguousness in the case when start is None or unfeasibly far back.

moving_stop(now)[source]

Returns a datetime till which to ensure contiguousness in the case when stop is None or unfeasibly far forward.

finite_datetimes(finite_start, finite_stop)[source]

Simply returns the points in time that correspond to a whole number of minutes intervals.

luigi.tools.range.most_common(items)[source]
luigi.tools.range.infer_bulk_complete_from_fs(datetimes, datetime_to_task, datetime_to_re)[source]

Efficiently determines missing datetimes by filesystem listing.

The current implementation works for the common case of a task writing output to a FileSystemTarget whose path is built using strftime with format like ‘…%Y…%m…%d…%H…’, without custom complete() or exists().

(Eventually Luigi could have ranges of completion as first-class citizens. Then this listing business could be factored away/be provided for explicitly in target API or some kind of a history server.)

class luigi.tools.range.RangeMonthly(*args, **kwargs)[source]

Produces a contiguous completed range of a monthly recurring task.

Unlike the Range* classes with shorter intervals, this class does not perform bulk optimisation. It is assumed that the number of months is low enough not to motivate the increased complexity. Hence, there is no class RangeMonthlyBase.

start

Parameter whose value is a date, specified to the month (day of date is “rounded” to first of the month).

A MonthParameter is a Date string formatted YYYY-MM. For example, 2013-07 specifies July of 2013. Task objects constructed from code accept date (ignoring the day value) or Month.

stop

Parameter whose value is a date, specified to the month (day of date is “rounded” to first of the month).

A MonthParameter is a Date string formatted YYYY-MM. For example, 2013-07 specifies July of 2013. Task objects constructed from code accept date (ignoring the day value) or Month.

months_back

Parameter whose value is an int.

months_forward

Parameter whose value is an int.

datetime_to_parameter(dt)[source]
parameter_to_datetime(p)[source]
datetime_to_parameters(dt)[source]

Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter

parameters_to_datetime(p)[source]

Given a dictionary of parameters, will extract the ranged task parameter value

moving_start(now)[source]

Returns a datetime from which to ensure contiguousness in the case when start is None or unfeasibly far back.

moving_stop(now)[source]

Returns a datetime till which to ensure contiguousness in the case when stop is None or unfeasibly far forward.

finite_datetimes(finite_start, finite_stop)[source]

Simply returns the points in time that correspond to turn of month.

class luigi.tools.range.RangeDaily(*args, **kwargs)[source]

Efficiently produces a contiguous completed range of a daily recurring task that takes a single DateParameter.

Falls back to infer it from output filesystem listing to facilitate the common case usage.

Convenient to use even from command line, like:

luigi --module your.module RangeDaily --of YourActualTask --start 2014-01-01
missing_datetimes(finite_datetimes)[source]

Override in subclasses to do bulk checks.

Returns a sorted list.

This is a conservative base implementation that brutally checks completeness, instance by instance.

Inadvisable as it may be slow.

class luigi.tools.range.RangeHourly(*args, **kwargs)[source]

Efficiently produces a contiguous completed range of an hourly recurring task that takes a single DateHourParameter.

Benefits from bulk_complete information to efficiently cover gaps.

Falls back to infer it from output filesystem listing to facilitate the common case usage.

Convenient to use even from command line, like:

luigi --module your.module RangeHourly --of YourActualTask --start 2014-01-01T00
missing_datetimes(finite_datetimes)[source]

Override in subclasses to do bulk checks.

Returns a sorted list.

This is a conservative base implementation that brutally checks completeness, instance by instance.

Inadvisable as it may be slow.

class luigi.tools.range.RangeByMinutes(*args, **kwargs)[source]

Efficiently produces a contiguous completed range of an recurring task every interval minutes that takes a single DateMinuteParameter.

Benefits from bulk_complete information to efficiently cover gaps.

Falls back to infer it from output filesystem listing to facilitate the common case usage.

Convenient to use even from command line, like:

luigi --module your.module RangeByMinutes --of YourActualTask --start 2014-01-01T0123
missing_datetimes(finite_datetimes)[source]

Override in subclasses to do bulk checks.

Returns a sorted list.

This is a conservative base implementation that brutally checks completeness, instance by instance.

Inadvisable as it may be slow.