Running Luigi¶
Running from the Command Line¶
The preferred way to run Luigi tasks is through the luigi
command line tool
that will be installed with the pip package.
# my_module.py, available in your sys.path
import luigi
class MyTask(luigi.Task):
x = luigi.IntParameter()
y = luigi.IntParameter(default=45)
def run(self):
print(self.x + self.y)
Should be run like this
$ luigi --module my_module MyTask --x 123 --y 456 --local-scheduler
Or alternatively like this:
$ python -m luigi --module my_module MyTask --x 100 --local-scheduler
Note that if a parameter name contains ‘_’, it should be replaced by ‘-‘. For example, if MyTask had a parameter called ‘my_parameter’:
$ luigi --module my_module MyTask --my-parameter 100 --local-scheduler
Note
Please make sure to always place task parameters behind the task family!
Running from Python code¶
Another way to start tasks from Python code is using luigi.build(tasks, worker_scheduler_factory=None, **env_params)
from luigi.interface
module.
This way of running luigi tasks is useful if you want to get some dynamic parameters from another source, such as database, or provide additional logic before you start tasks.
One notable difference is that build
defaults to not using the identical process lock.
If you want to change this behaviour, just pass no_lock=False
.
class MyTask1(luigi.Task):
x = luigi.IntParameter()
y = luigi.IntParameter(default=0)
def run(self):
print(self.x + self.y)
class MyTask2(luigi.Task):
x = luigi.IntParameter()
y = luigi.IntParameter(default=1)
z = luigi.IntParameter(default=2)
def run(self):
print(self.x * self.y * self.z)
if __name__ == '__main__':
luigi.build([MyTask1(x=10), MyTask2(x=15, z=3)])
Also, it is possible to pass additional parameters to build
such as host, port, workers and local_scheduler:
if __name__ == '__main__':
luigi.build([MyTask1(x=1)], workers=5, local_scheduler=True)
To achieve some special requirements you can pass to build
your worker_scheduler_factory
which will return your worker and/or scheduler implementations:
class MyWorker(Worker):
# some custom logic
class MyFactory:
def create_local_scheduler(self):
return scheduler.Scheduler(prune_on_get_work=True, record_task_history=False)
def create_remote_scheduler(self, url):
return rpc.RemoteScheduler(url)
def create_worker(self, scheduler, worker_processes, assistant=False):
# return your worker instance
return MyWorker(
scheduler=scheduler, worker_processes=worker_processes, assistant=assistant)
if __name__ == '__main__':
luigi.build([MyTask1(x=1)], worker_scheduler_factory=MyFactory())
In some cases (like task queue) it may be useful.
Response of luigi.build()/luigi.run()¶
Default response By default luigi.build()/luigi.run() returns True if there were no scheduling errors. This is the same as the attribute
LuigiRunResult.scheduling_succeeded
.Detailed response This is a response of type
LuigiRunResult
. This is obtained by passing a keyword argumentdetailed_summary=True
to build/run. This response contains detailed information about the jobs.if __name__ == '__main__': luigi_run_result = luigi.build(..., detailed_summary=True) print(luigi_run_result.summary_text)
Luigi on Windows¶
Most Luigi functionality works on Windows. Exceptions:
Specifying multiple worker processes using the
workers
argument forluigi.build
, or using the--workers
command line argument. (Similarly, specifying--worker-force-multiprocessing
). For most programs, this will result in failure (a common sight isBrokenPipeError
). The reason is that worker processes are assumed to be forked from the main process. Forking is not possible on Windows.Running the Luigi central scheduling server as a daemon (i.e. with
--background
). Again, a Unix-only concept.