luigi.tools.range
Produces contiguous completed ranges of recurring tasks.
See RangeDaily and RangeHourly for basic usage.
Caveat - if gaps accumulate, their causes (e.g. missing dependencies) going
unmonitored/unmitigated, then this will eventually keep retrying the same gaps
over and over and make no progress to more recent times. (See task_limit
and reverse parameters.)
TODO foolproof against that kind of misuse?
Functions
|
Efficiently determines missing datetimes by filesystem listing. |
|
Classes
|
Produces a contiguous completed range of a recurring task. |
|
Efficiently produces a contiguous completed range of an recurring task every interval minutes that takes a single |
|
Produces a contiguous completed range of an recurring tasks separated a specified number of minutes. |
|
Efficiently produces a contiguous completed range of a daily recurring task that takes a single |
|
Produces a contiguous completed range of a daily recurring task. |
Events communicating useful metrics. |
|
|
Efficiently produces a contiguous completed range of an hourly recurring task that takes a single |
|
Produces a contiguous completed range of an hourly recurring task. |
|
Produces a contiguous completed range of a monthly recurring task. |
- class luigi.tools.range.RangeEvent[source]
Events communicating useful metrics.
COMPLETE_COUNTwould normally be nondecreasing, and its derivative would describe performance (how many instances complete invocation-over-invocation).COMPLETE_FRACTIONreaching 1 would be a telling event in case of a backfill with defined start and stop. Would not be strikingly useful for a typical recurring task without stop defined, fluctuating close to 1.DELAYis measured from the first found missing datehour till (current time + hours_forward), or till stop if it is defined. In hours for Hourly. TBD different units for other frequencies? TODO any different for reverse mode? From first missing till last missing? From last gap till stop?- COMPLETE_COUNT = 'event.tools.range.complete.count'
- COMPLETE_FRACTION = 'event.tools.range.complete.fraction'
- DELAY = 'event.tools.range.delay'
- class luigi.tools.range.RangeBase(*args, **kwargs)[source]
Produces a contiguous completed range of a recurring task.
Made for the common use case where a task is parameterized by e.g.
DateParameter, and assurance is needed that any gaps arising from downtime are eventually filled.Emits events that one can use to monitor gaps and delays.
At least one of start and stop needs to be specified.
(This is quite an abstract base class for subclasses with different datetime parameter classes, e.g.
DateParameter,DateHourParameter, …, and different parameter naming, e.g. days_back/forward, hours_back/forward, …, as well as different documentation wording, to improve user experience.)Subclasses will need to use the
ofparameter when overriding methods.- of
A parameter that takes another luigi task class.
When used programatically, the parameter should be specified directly with the
luigi.task.Task(sub) class. LikeMyMetaTask(my_task_param=my_tasks.MyTask). On the command line, you specify theluigi.task.Task.get_task_family(). Like$ luigi --module my_tasks MyMetaTask --my_task_param my_namespace.MyTask
Where
my_namespace.MyTaskis defined in themy_taskspython module.When the
luigi.task.Taskclass is instantiated to an object. The value will always be a task class (and not a string).
- of_params
Parameter whose value is a
dict.In the task definition, use
class MyTask(luigi.Task): tags = luigi.DictParameter() def run(self): logging.info("Find server with role: %s", self.tags['role']) server = aws.ec2.find_my_resource(self.tags)
At the command line, use
$ luigi --module my_tasks MyTask --tags <JSON string>
Simple example with two tags:
$ luigi --module my_tasks MyTask --tags '{"role": "web", "env": "staging"}'
It can be used to define dynamic parameters, when you do not know the exact list of your parameters (e.g. list of tags, that are dynamically constructed outside Luigi), or you have a complex parameter containing logically related values (like a database connection config).
It is possible to provide a JSON schema that should be validated by the given value:
class MyTask(luigi.Task): tags = luigi.DictParameter( schema={ "type": "object", "patternProperties": { ".*": {"type": "string", "enum": ["web", "staging"]}, } } ) def run(self): logging.info("Find server with role: %s", self.tags['role']) server = aws.ec2.find_my_resource(self.tags)
Using this schema, the following command will work:
$ luigi --module my_tasks MyTask --tags '{"role": "web", "env": "staging"}'
while this command will fail because the parameter is not valid:
$ luigi --module my_tasks MyTask --tags '{"role": "UNKNOWN_VALUE", "env": "staging"}'
Finally, the provided schema can be a custom validator:
custom_validator = jsonschema.Draft4Validator( schema={ "type": "object", "patternProperties": { ".*": {"type": "string", "enum": ["web", "staging"]}, } } ) class MyTask(luigi.Task): tags = luigi.DictParameter(schema=custom_validator) def run(self): logging.info("Find server with role: %s", self.tags['role']) server = aws.ec2.find_my_resource(self.tags)
- start
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See Parameters from config IngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- stop
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See Parameters from config IngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- reverse
A Parameter whose value is a
bool. This parameter has an implicit default value ofFalse. For the command line interface this means that the value isFalseunless you add"--the-bool-parameter"to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to beTrue. This is called explicit parsing. When omitting the parameter value, it is still consideredTruebut to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.You can toggle between the two parsing modes on a per-parameter base via
class MyTask(luigi.Task): implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING) explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)
or globally by
luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING
for all bool parameters instantiated after this line.
- task_limit
Parameter whose value is an
int.
- now
Parameter whose value is an
int.
- param_name
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See Parameters from config IngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- property of_cls
DONT USE. Will be deleted soon. Use
self.of!
- datetime_to_parameters(dt)[source]
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
- parameters_to_datetime(p)[source]
Given a dictionary of parameters, will extract the ranged task parameter value
- moving_start(now)[source]
Returns a datetime from which to ensure contiguousness in the case when start is None or unfeasibly far back.
- moving_stop(now)[source]
Returns a datetime till which to ensure contiguousness in the case when stop is None or unfeasibly far forward.
- finite_datetimes(finite_start, finite_stop)[source]
Returns the individual datetimes in interval [finite_start, finite_stop) for which task completeness should be required, as a sorted list.
- requires()[source]
The Tasks that this Task depends on.
A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.
See Task.requires
- class luigi.tools.range.RangeDailyBase(*args, **kwargs)[source]
Produces a contiguous completed range of a daily recurring task.
- start
Parameter whose value is a
date.A DateParameter is a Date string formatted
YYYY-MM-DD. For example,2013-07-10specifies July 10, 2013.DateParameters are 90% of the time used to be interpolated into file system paths or the like. Here is a gentle reminder of how to interpolate date parameters into strings:
class MyTask(luigi.Task): date = luigi.DateParameter() def run(self): templated_path = "/my/path/to/my/dataset/{date:%Y/%m/%d}/" instantiated_path = templated_path.format(date=self.date) # print(instantiated_path) --> /my/path/to/my/dataset/2016/06/09/ # ... use instantiated_path ...
To set this parameter to default to the current day. You can write code like this:
import datetime class MyTask(luigi.Task): date = luigi.DateParameter(default=datetime.date.today())
- stop
Parameter whose value is a
date.A DateParameter is a Date string formatted
YYYY-MM-DD. For example,2013-07-10specifies July 10, 2013.DateParameters are 90% of the time used to be interpolated into file system paths or the like. Here is a gentle reminder of how to interpolate date parameters into strings:
class MyTask(luigi.Task): date = luigi.DateParameter() def run(self): templated_path = "/my/path/to/my/dataset/{date:%Y/%m/%d}/" instantiated_path = templated_path.format(date=self.date) # print(instantiated_path) --> /my/path/to/my/dataset/2016/06/09/ # ... use instantiated_path ...
To set this parameter to default to the current day. You can write code like this:
import datetime class MyTask(luigi.Task): date = luigi.DateParameter(default=datetime.date.today())
- days_back
Parameter whose value is an
int.
- days_forward
Parameter whose value is an
int.
- datetime_to_parameters(dt)[source]
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
- parameters_to_datetime(p)[source]
Given a dictionary of parameters, will extract the ranged task parameter value
- moving_start(now)[source]
Returns a datetime from which to ensure contiguousness in the case when start is None or unfeasibly far back.
- class luigi.tools.range.RangeHourlyBase(*args, **kwargs)[source]
Produces a contiguous completed range of an hourly recurring task.
- start
Parameter whose value is a
datetimespecified to the hour.A DateHourParameter is a ISO 8601 formatted date and time specified to the hour. For example,
2013-07-10T19specifies July 10, 2013 at 19:00.
- stop
Parameter whose value is a
datetimespecified to the hour.A DateHourParameter is a ISO 8601 formatted date and time specified to the hour. For example,
2013-07-10T19specifies July 10, 2013 at 19:00.
- hours_back
Parameter whose value is an
int.
- hours_forward
Parameter whose value is an
int.
- datetime_to_parameters(dt)[source]
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
- parameters_to_datetime(p)[source]
Given a dictionary of parameters, will extract the ranged task parameter value
- moving_start(now)[source]
Returns a datetime from which to ensure contiguousness in the case when start is None or unfeasibly far back.
- class luigi.tools.range.RangeByMinutesBase(*args, **kwargs)[source]
Produces a contiguous completed range of an recurring tasks separated a specified number of minutes.
- start
Parameter whose value is a
datetimespecified to the minute.A DateMinuteParameter is a ISO 8601 formatted date and time specified to the minute. For example,
2013-07-10T1907specifies July 10, 2013 at 19:07.The interval parameter can be used to clamp this parameter to every N minutes, instead of every minute.
- stop
Parameter whose value is a
datetimespecified to the minute.A DateMinuteParameter is a ISO 8601 formatted date and time specified to the minute. For example,
2013-07-10T1907specifies July 10, 2013 at 19:07.The interval parameter can be used to clamp this parameter to every N minutes, instead of every minute.
- minutes_back
Parameter whose value is an
int.
- minutes_forward
Parameter whose value is an
int.
- minutes_interval
Parameter whose value is an
int.
- datetime_to_parameters(dt)[source]
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
- parameters_to_datetime(p)[source]
Given a dictionary of parameters, will extract the ranged task parameter value
- moving_start(now)[source]
Returns a datetime from which to ensure contiguousness in the case when start is None or unfeasibly far back.
- luigi.tools.range.infer_bulk_complete_from_fs(datetimes, datetime_to_task, datetime_to_re)[source]
Efficiently determines missing datetimes by filesystem listing.
The current implementation works for the common case of a task writing output to a
FileSystemTargetwhose path is built using strftime with format like ‘…%Y…%m…%d…%H…’, without customcomplete()orexists().(Eventually Luigi could have ranges of completion as first-class citizens. Then this listing business could be factored away/be provided for explicitly in target API or some kind of a history server.)
- class luigi.tools.range.RangeMonthly(*args, **kwargs)[source]
Produces a contiguous completed range of a monthly recurring task.
Unlike the Range* classes with shorter intervals, this class does not perform bulk optimisation. It is assumed that the number of months is low enough not to motivate the increased complexity. Hence, there is no class RangeMonthlyBase.
- start
Parameter whose value is a
date, specified to the month (day ofdateis “rounded” to first of the month).A MonthParameter is a Date string formatted
YYYY-MM. For example,2013-07specifies July of 2013. Task objects constructed from code acceptdate(ignoring the day value) orMonth.
- stop
Parameter whose value is a
date, specified to the month (day ofdateis “rounded” to first of the month).A MonthParameter is a Date string formatted
YYYY-MM. For example,2013-07specifies July of 2013. Task objects constructed from code acceptdate(ignoring the day value) orMonth.
- months_back
Parameter whose value is an
int.
- months_forward
Parameter whose value is an
int.
- datetime_to_parameters(dt)[source]
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
- parameters_to_datetime(p)[source]
Given a dictionary of parameters, will extract the ranged task parameter value
- moving_start(now)[source]
Returns a datetime from which to ensure contiguousness in the case when start is None or unfeasibly far back.
- class luigi.tools.range.RangeDaily(*args, **kwargs)[source]
Efficiently produces a contiguous completed range of a daily recurring task that takes a single
DateParameter.Falls back to infer it from output filesystem listing to facilitate the common case usage.
Convenient to use even from command line, like:
luigi --module your.module RangeDaily --of YourActualTask --start 2014-01-01
- class luigi.tools.range.RangeHourly(*args, **kwargs)[source]
Efficiently produces a contiguous completed range of an hourly recurring task that takes a single
DateHourParameter.Benefits from
bulk_completeinformation to efficiently cover gaps.Falls back to infer it from output filesystem listing to facilitate the common case usage.
Convenient to use even from command line, like:
luigi --module your.module RangeHourly --of YourActualTask --start 2014-01-01T00
- class luigi.tools.range.RangeByMinutes(*args, **kwargs)[source]
Efficiently produces a contiguous completed range of an recurring task every interval minutes that takes a single
DateMinuteParameter.Benefits from
bulk_completeinformation to efficiently cover gaps.Falls back to infer it from output filesystem listing to facilitate the common case usage.
Convenient to use even from command line, like:
luigi --module your.module RangeByMinutes --of YourActualTask --start 2014-01-01T0123