luigi.tools.range module

Produces contiguous completed ranges of recurring tasks.

See RangeDaily and RangeHourly for basic usage.

Caveat - if gaps accumulate, their causes (e.g. missing dependencies) going unmonitored/unmitigated, then this will eventually keep retrying the same gaps over and over and make no progress to more recent times. (See ‘task_limit’ and ‘reverse’ parameters.) TODO foolproof against that kind of misuse?

class luigi.tools.range.RangeEvent[source]

Bases: luigi.event.Event

Events communicating useful metrics.

COMPLETE_COUNT would normally be nondecreasing, and its derivative would describe performance (how many instances complete invocation-over-invocation).

COMPLETE_FRACTION reaching 1 would be a telling event in case of a backfill with defined start and stop. Would not be strikingly useful for a typical recurring task without stop defined, fluctuating close to 1.

DELAY is measured from the first found missing datehour till (current time + hours_forward), or till stop if it is defined. In hours for Hourly. TBD different units for other frequencies? TODO any different for reverse mode? From first missing till last missing? From last gap till stop?

COMPLETE_COUNT = 'event.tools.range.complete.count'
COMPLETE_FRACTION = 'event.tools.range.complete.fraction'
DELAY = 'event.tools.range.delay'
class luigi.tools.range.RangeBase(*args, **kwargs)[source]

Bases: luigi.task.WrapperTask

Produces a contiguous completed range of a recurring task.

Made for the common use case where a task is parameterized by e.g. DateParameter, and assurance is needed that any gaps arising from downtime are eventually filled.

Emits events that one can use to monitor gaps and delays.

At least one of start and stop needs to be specified.

(This is quite an abstract base class for subclasses with different datetime parameter class, e.g. DateParameter, DateHourParameter, ..., and different parameter naming, e.g. days_back/forward, hours_back/forward, ..., as well as different documentation wording, for good user experience.)

Subclasses will need to use the of parameter when overriding methods.

of = TaskParameter: task name to be completed. The task must take a single datetime parameter
of_params = DictParameter (defaults to {}): Arguments to be provided to the 'of' class when instantiating
start = Parameter
stop = Parameter
reverse = BoolParameter (defaults to False): specifies the preferred order for catching up. False - work from the oldest missing outputs onward; True - from the newest backward
task_limit = IntParameter (defaults to 50): how many of 'of' tasks to require. Guards against scheduling insane amounts of tasks in one go
now = IntParameter (defaults to None): set to override current time. In seconds since epoch
param_name = Parameter (defaults to None): parameter name used to pass in parameterized value. Defaults to None, meaning use first positional parameter
of_cls

DONT USE. Will be deleted soon. Use self.of!

datetime_to_parameter(dt)[source]
parameter_to_datetime(p)[source]
datetime_to_parameters(dt)[source]

Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter

parameters_to_datetime(p)[source]

Given a dictionary of parameters, will extract the ranged task parameter value

moving_start(now)[source]

Returns a datetime from which to ensure contiguousness in the case when start is None or unfeasibly far back.

moving_stop(now)[source]

Returns a datetime till which to ensure contiguousness in the case when stop is None or unfeasibly far forward.

finite_datetimes(finite_start, finite_stop)[source]

Returns the individual datetimes in interval [finite_start, finite_stop) for which task completeness should be required, as a sorted list.

requires()[source]
missing_datetimes(finite_datetimes)[source]

Override in subclasses to do bulk checks.

Returns a sorted list.

This is a conservative base implementation that brutally checks completeness, instance by instance.

Inadvisable as it may be slow.

class luigi.tools.range.RangeDailyBase(*args, **kwargs)[source]

Bases: luigi.tools.range.RangeBase

Produces a contiguous completed range of a daily recurring task.

start = DateParameter (defaults to None): beginning date, inclusive. Default: None - work backward forever (requires reverse=True)
stop = DateParameter (defaults to None): ending date, exclusive. Default: None - work forward forever
days_back = IntParameter (defaults to 100): extent to which contiguousness is to be assured into past, in days from current time. Prevents infinite loop when start is none. If the dataset has limited retention (i.e. old outputs get removed), this should be set shorter to that, too, to prevent the oldest outputs flapping. Increase freely if you intend to process old dates - worker's memory is the limit
days_forward = IntParameter (defaults to 0): extent to which contiguousness is to be assured into future, in days from current time. Prevents infinite loop when stop is none
datetime_to_parameter(dt)[source]
parameter_to_datetime(p)[source]
datetime_to_parameters(dt)[source]

Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter

parameters_to_datetime(p)[source]

Given a dictionary of parameters, will extract the ranged task parameter value

moving_start(now)[source]
moving_stop(now)[source]
finite_datetimes(finite_start, finite_stop)[source]

Simply returns the points in time that correspond to turn of day.

class luigi.tools.range.RangeHourlyBase(*args, **kwargs)[source]

Bases: luigi.tools.range.RangeBase

Produces a contiguous completed range of an hourly recurring task.

start = DateHourParameter (defaults to None): beginning datehour, inclusive. Default: None - work backward forever (requires reverse=True)
stop = DateHourParameter (defaults to None): ending datehour, exclusive. Default: None - work forward forever
hours_back = IntParameter (defaults to 2400): extent to which contiguousness is to be assured into past, in hours from current time. Prevents infinite loop when start is none. If the dataset has limited retention (i.e. old outputs get removed), this should be set shorter to that, too, to prevent the oldest outputs flapping. Increase freely if you intend to process old dates - worker's memory is the limit
hours_forward = IntParameter (defaults to 0): extent to which contiguousness is to be assured into future, in hours from current time. Prevents infinite loop when stop is none
datetime_to_parameter(dt)[source]
parameter_to_datetime(p)[source]
datetime_to_parameters(dt)[source]

Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter

parameters_to_datetime(p)[source]

Given a dictionary of parameters, will extract the ranged task parameter value

moving_start(now)[source]
moving_stop(now)[source]
finite_datetimes(finite_start, finite_stop)[source]

Simply returns the points in time that correspond to whole hours.

class luigi.tools.range.RangeByMinutesBase(*args, **kwargs)[source]

Bases: luigi.tools.range.RangeBase

Produces a contiguous completed range of an recurring tasks separated a specified number of minutes.

start = DateMinuteParameter (defaults to None): beginning date-hour-minute, inclusive. Default: None - work backward forever (requires reverse=True)
stop = DateMinuteParameter (defaults to None): ending date-hour-minute, exclusive. Default: None - work forward forever
minutes_back = IntParameter (defaults to 1440): extent to which contiguousness is to be assured into past, in minutes from current time. Prevents infinite loop when start is none. If the dataset has limited retention (i.e. old outputs get removed), this should be set shorter to that, too, to prevent the oldest outputs flapping. Increase freely if you intend to process old dates - worker's memory is the limit
minutes_forward = IntParameter (defaults to 0): extent to which contiguousness is to be assured into future, in minutes from current time. Prevents infinite loop when stop is none
minutes_interval = IntParameter (defaults to 1): separation between events in minutes. It must evenly divide 60
datetime_to_parameter(dt)[source]
parameter_to_datetime(p)[source]
datetime_to_parameters(dt)[source]

Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter

parameters_to_datetime(p)[source]

Given a dictionary of parameters, will extract the ranged task parameter value

moving_start(now)[source]
moving_stop(now)[source]
finite_datetimes(finite_start, finite_stop)[source]

Simply returns the points in time that correspond to a whole number of minutes intervals.

luigi.tools.range.most_common(items)[source]

Wanted functionality from Counters (new in Python 2.7).

luigi.tools.range.infer_bulk_complete_from_fs(datetimes, datetime_to_task, datetime_to_re)[source]

Efficiently determines missing datetimes by filesystem listing.

The current implementation works for the common case of a task writing output to a FileSystemTarget whose path is built using strftime with format like ‘...%Y...%m...%d...%H...’, without custom complete() or exists().

(Eventually Luigi could have ranges of completion as first-class citizens. Then this listing business could be factored away/be provided for explicitly in target API or some kind of a history server.)

class luigi.tools.range.RangeDaily(*args, **kwargs)[source]

Bases: luigi.tools.range.RangeDailyBase

Efficiently produces a contiguous completed range of a daily recurring task that takes a single DateParameter.

Falls back to infer it from output filesystem listing to facilitate the common case usage.

Convenient to use even from command line, like:

luigi --module your.module RangeDaily --of YourActualTask --start 2014-01-01
missing_datetimes(finite_datetimes)[source]
class luigi.tools.range.RangeHourly(*args, **kwargs)[source]

Bases: luigi.tools.range.RangeHourlyBase

Efficiently produces a contiguous completed range of an hourly recurring task that takes a single DateHourParameter.

Benefits from bulk_complete information to efficiently cover gaps.

Falls back to infer it from output filesystem listing to facilitate the common case usage.

Convenient to use even from command line, like:

luigi --module your.module RangeHourly --of YourActualTask --start 2014-01-01T00
missing_datetimes(finite_datetimes)[source]
class luigi.tools.range.RangeByMinutes(*args, **kwargs)[source]

Bases: luigi.tools.range.RangeByMinutesBase

Efficiently produces a contiguous completed range of an recurring task every interval minutes that takes a single DateMinuteParameter.

Benefits from bulk_complete information to efficiently cover gaps.

Falls back to infer it from output filesystem listing to facilitate the common case usage.

Convenient to use even from command line, like:

luigi --module your.module RangeByMinutes --of YourActualTask --start 2014-01-01T0123
missing_datetimes(finite_datetimes)[source]