luigi.tools.range module¶
Produces contiguous completed ranges of recurring tasks.
See RangeDaily
and RangeHourly
for basic usage.
Caveat - if gaps accumulate, their causes (e.g. missing dependencies) going
unmonitored/unmitigated, then this will eventually keep retrying the same gaps
over and over and make no progress to more recent times. (See task_limit
and reverse
parameters.)
TODO foolproof against that kind of misuse?
- class luigi.tools.range.RangeEvent[source]¶
Bases:
Event
Events communicating useful metrics.
COMPLETE_COUNT
would normally be nondecreasing, and its derivative would describe performance (how many instances complete invocation-over-invocation).COMPLETE_FRACTION
reaching 1 would be a telling event in case of a backfill with defined start and stop. Would not be strikingly useful for a typical recurring task without stop defined, fluctuating close to 1.DELAY
is measured from the first found missing datehour till (current time + hours_forward), or till stop if it is defined. In hours for Hourly. TBD different units for other frequencies? TODO any different for reverse mode? From first missing till last missing? From last gap till stop?- COMPLETE_COUNT = 'event.tools.range.complete.count'¶
- COMPLETE_FRACTION = 'event.tools.range.complete.fraction'¶
- DELAY = 'event.tools.range.delay'¶
- class luigi.tools.range.RangeBase(*args, **kwargs)[source]¶
Bases:
WrapperTask
Produces a contiguous completed range of a recurring task.
Made for the common use case where a task is parameterized by e.g.
DateParameter
, and assurance is needed that any gaps arising from downtime are eventually filled.Emits events that one can use to monitor gaps and delays.
At least one of start and stop needs to be specified.
(This is quite an abstract base class for subclasses with different datetime parameter classes, e.g.
DateParameter
,DateHourParameter
, …, and different parameter naming, e.g. days_back/forward, hours_back/forward, …, as well as different documentation wording, to improve user experience.)Subclasses will need to use the
of
parameter when overriding methods.- of = TaskParameter: task name to be completed. The task must take a single datetime parameter¶
- of_params = DictParameter (defaults to {}): Arguments to be provided to the 'of' class when instantiating¶
- start = Parameter¶
- stop = Parameter¶
- reverse = BoolParameter (defaults to False): specifies the preferred order for catching up. False - work from the oldest missing outputs onward; True - from the newest backward¶
- task_limit = IntParameter (defaults to 50): how many of 'of' tasks to require. Guards against scheduling insane amounts of tasks in one go¶
- now = IntParameter (defaults to None): set to override current time. In seconds since epoch¶
- param_name = Parameter (defaults to None): parameter name used to pass in parameterized value. Defaults to None, meaning use first positional parameter¶
- property of_cls¶
DONT USE. Will be deleted soon. Use
self.of
!
- datetime_to_parameters(dt)[source]¶
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
- parameters_to_datetime(p)[source]¶
Given a dictionary of parameters, will extract the ranged task parameter value
- moving_start(now)[source]¶
Returns a datetime from which to ensure contiguousness in the case when start is None or unfeasibly far back.
- moving_stop(now)[source]¶
Returns a datetime till which to ensure contiguousness in the case when stop is None or unfeasibly far forward.
- finite_datetimes(finite_start, finite_stop)[source]¶
Returns the individual datetimes in interval [finite_start, finite_stop) for which task completeness should be required, as a sorted list.
- requires()[source]¶
The Tasks that this Task depends on.
A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.
See Task.requires
- class luigi.tools.range.RangeDailyBase(*args, **kwargs)[source]¶
Bases:
RangeBase
Produces a contiguous completed range of a daily recurring task.
- start = DateParameter (defaults to None): beginning date, inclusive. Default: None - work backward forever (requires reverse=True)¶
- stop = DateParameter (defaults to None): ending date, exclusive. Default: None - work forward forever¶
- days_back = IntParameter (defaults to 100): extent to which contiguousness is to be assured into past, in days from current time. Prevents infinite loop when start is none. If the dataset has limited retention (i.e. old outputs get removed), this should be set shorter to that, too, to prevent the oldest outputs flapping. Increase freely if you intend to process old dates - worker's memory is the limit¶
- days_forward = IntParameter (defaults to 0): extent to which contiguousness is to be assured into future, in days from current time. Prevents infinite loop when stop is none¶
- datetime_to_parameters(dt)[source]¶
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
- parameters_to_datetime(p)[source]¶
Given a dictionary of parameters, will extract the ranged task parameter value
- moving_start(now)[source]¶
Returns a datetime from which to ensure contiguousness in the case when start is None or unfeasibly far back.
- class luigi.tools.range.RangeHourlyBase(*args, **kwargs)[source]¶
Bases:
RangeBase
Produces a contiguous completed range of an hourly recurring task.
- start = DateHourParameter (defaults to None): beginning datehour, inclusive. Default: None - work backward forever (requires reverse=True)¶
- stop = DateHourParameter (defaults to None): ending datehour, exclusive. Default: None - work forward forever¶
- hours_back = IntParameter (defaults to 2400): extent to which contiguousness is to be assured into past, in hours from current time. Prevents infinite loop when start is none. If the dataset has limited retention (i.e. old outputs get removed), this should be set shorter to that, too, to prevent the oldest outputs flapping. Increase freely if you intend to process old dates - worker's memory is the limit¶
- hours_forward = IntParameter (defaults to 0): extent to which contiguousness is to be assured into future, in hours from current time. Prevents infinite loop when stop is none¶
- datetime_to_parameters(dt)[source]¶
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
- parameters_to_datetime(p)[source]¶
Given a dictionary of parameters, will extract the ranged task parameter value
- moving_start(now)[source]¶
Returns a datetime from which to ensure contiguousness in the case when start is None or unfeasibly far back.
- class luigi.tools.range.RangeByMinutesBase(*args, **kwargs)[source]¶
Bases:
RangeBase
Produces a contiguous completed range of an recurring tasks separated a specified number of minutes.
- start = DateMinuteParameter (defaults to None): beginning date-hour-minute, inclusive. Default: None - work backward forever (requires reverse=True)¶
- stop = DateMinuteParameter (defaults to None): ending date-hour-minute, exclusive. Default: None - work forward forever¶
- minutes_back = IntParameter (defaults to 1440): extent to which contiguousness is to be assured into past, in minutes from current time. Prevents infinite loop when start is none. If the dataset has limited retention (i.e. old outputs get removed), this should be set shorter to that, too, to prevent the oldest outputs flapping. Increase freely if you intend to process old dates - worker's memory is the limit¶
- minutes_forward = IntParameter (defaults to 0): extent to which contiguousness is to be assured into future, in minutes from current time. Prevents infinite loop when stop is none¶
- minutes_interval = IntParameter (defaults to 1): separation between events in minutes. It must evenly divide 60¶
- datetime_to_parameters(dt)[source]¶
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
- parameters_to_datetime(p)[source]¶
Given a dictionary of parameters, will extract the ranged task parameter value
- moving_start(now)[source]¶
Returns a datetime from which to ensure contiguousness in the case when start is None or unfeasibly far back.
- luigi.tools.range.infer_bulk_complete_from_fs(datetimes, datetime_to_task, datetime_to_re)[source]¶
Efficiently determines missing datetimes by filesystem listing.
The current implementation works for the common case of a task writing output to a
FileSystemTarget
whose path is built using strftime with format like ‘…%Y…%m…%d…%H…’, without customcomplete()
orexists()
.(Eventually Luigi could have ranges of completion as first-class citizens. Then this listing business could be factored away/be provided for explicitly in target API or some kind of a history server.)
- class luigi.tools.range.RangeMonthly(*args, **kwargs)[source]¶
Bases:
RangeBase
Produces a contiguous completed range of a monthly recurring task.
Unlike the Range* classes with shorter intervals, this class does not perform bulk optimisation. It is assumed that the number of months is low enough not to motivate the increased complexity. Hence, there is no class RangeMonthlyBase.
- start = MonthParameter (defaults to None): beginning month, inclusive. Default: None - work backward forever (requires reverse=True)¶
- stop = MonthParameter (defaults to None): ending month, exclusive. Default: None - work forward forever¶
- months_back = IntParameter (defaults to 13): extent to which contiguousness is to be assured into past, in months from current time. Prevents infinite loop when start is none. If the dataset has limited retention (i.e. old outputs get removed), this should be set shorter to that, too, to prevent the oldest outputs flapping. Increase freely if you intend to process old dates - worker's memory is the limit¶
- months_forward = IntParameter (defaults to 0): extent to which contiguousness is to be assured into future, in months from current time. Prevents infinite loop when stop is none¶
- datetime_to_parameters(dt)[source]¶
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
- parameters_to_datetime(p)[source]¶
Given a dictionary of parameters, will extract the ranged task parameter value
- moving_start(now)[source]¶
Returns a datetime from which to ensure contiguousness in the case when start is None or unfeasibly far back.
- class luigi.tools.range.RangeDaily(*args, **kwargs)[source]¶
Bases:
RangeDailyBase
Efficiently produces a contiguous completed range of a daily recurring task that takes a single
DateParameter
.Falls back to infer it from output filesystem listing to facilitate the common case usage.
Convenient to use even from command line, like:
luigi --module your.module RangeDaily --of YourActualTask --start 2014-01-01
- class luigi.tools.range.RangeHourly(*args, **kwargs)[source]¶
Bases:
RangeHourlyBase
Efficiently produces a contiguous completed range of an hourly recurring task that takes a single
DateHourParameter
.Benefits from
bulk_complete
information to efficiently cover gaps.Falls back to infer it from output filesystem listing to facilitate the common case usage.
Convenient to use even from command line, like:
luigi --module your.module RangeHourly --of YourActualTask --start 2014-01-01T00
- class luigi.tools.range.RangeByMinutes(*args, **kwargs)[source]¶
Bases:
RangeByMinutesBase
Efficiently produces a contiguous completed range of an recurring task every interval minutes that takes a single
DateMinuteParameter
.Benefits from
bulk_complete
information to efficiently cover gaps.Falls back to infer it from output filesystem listing to facilitate the common case usage.
Convenient to use even from command line, like:
luigi --module your.module RangeByMinutes --of YourActualTask --start 2014-01-01T0123