luigi.contrib.sge
SGE batch system Tasks.
Adapted by Jake Feala (@jfeala) from LSF extension by Alex Wiltschko (@alexbw) Maintained by Jake Feala (@jfeala)
SunGrid Engine is a job scheduler used to allocate compute resources on a
shared cluster. Jobs are submitted using the qsub command and monitored
using qstat. To get started, install luigi on all nodes.
To run luigi workflows on an SGE cluster, subclass
luigi.contrib.sge.SGEJobTask as you would any luigi.Task,
but override the work() method, instead of run(), to define the job
code. Then, run your Luigi workflow from the master node, assigning > 1
workers in order to distribute the tasks in parallel across the cluster.
The following is an example usage (and can also be found in sge_tests.py)
import logging
import luigi
import os
from luigi.contrib.sge import SGEJobTask
logger = logging.getLogger('luigi-interface')
class TestJobTask(SGEJobTask):
i = luigi.Parameter()
def work(self):
logger.info('Running test job...')
with open(self.output().path, 'w') as f:
f.write('this is a test')
def output(self):
return luigi.LocalTarget(os.path.join('/home', 'testfile_' + str(self.i)))
if __name__ == '__main__':
tasks = [TestJobTask(i=str(i), n_cpu=i+1) for i in range(3)]
luigi.build(tasks, local_scheduler=True, workers=3)
The n-cpu parameter allows you to define different compute resource
requirements (or slots, in SGE terms) for each task. In this example, the
third Task asks for 3 CPU slots. If your cluster only contains nodes with
2 CPUs, this task will hang indefinitely in the queue. See the docs for
luigi.contrib.sge.SGEJobTask for other SGE parameters. As for any
task, you can also set these in your luigi configuration file as shown below.
The default values below were matched to the values used by MIT StarCluster,
an open-source SGE cluster manager for use with Amazon EC2:
[SGEJobTask]
shared-tmp-dir = /home
parallel-env = orte
n-cpu = 2
Classes
|
A local version of SGEJobTask, for easier debugging. |
|
Base class for executing a job on SunGrid Engine |
- class luigi.contrib.sge.SGEJobTask(*args, **kwargs)[source]
Base class for executing a job on SunGrid Engine
Override
work()(rather thanrun()) with your job code.Parameters:
- n_cpu: Number of CPUs (or “slots”) to allocate for the Task. This
value is passed as
qsub -pe {pe} {n_cpu}
- parallel_env: SGE parallel environment name. The default is “orte”,
the parallel environment installed with MIT StarCluster. If you are using a different cluster environment, check with your sysadmin for the right pe to use. This value is passed as {pe} to the qsub command above.
- shared_tmp_dir: Shared drive accessible from all nodes in the cluster.
Task classes and dependencies are pickled to a temporary folder on this drive. The default is
/home, the NFS share location setup by StarCluster
- job_name_format: String that can be passed in to customize the job name
string passed to qsub; e.g. “Task123_{task_family}_{n_cpu}…”.
job_name: Exact job name to pass to qsub.
run_locally: Run locally instead of on the cluster.
poll_time: the length of time to wait in order to poll qstat
dont_remove_tmp_dir: Instead of deleting the temporary directory, keep it.
- no_tarball: Don’t create a tarball of the luigi project directory. Can be
useful to reduce I/O requirements when the luigi directory is accessible from cluster nodes already.
- n_cpu
Parameter whose value is an
int.
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See Parameters from config IngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- parallel_env
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See Parameters from config IngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- job_name_format
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See Parameters from config IngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- run_locally
A Parameter whose value is a
bool. This parameter has an implicit default value ofFalse. For the command line interface this means that the value isFalseunless you add"--the-bool-parameter"to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to beTrue. This is called explicit parsing. When omitting the parameter value, it is still consideredTruebut to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.You can toggle between the two parsing modes on a per-parameter base via
class MyTask(luigi.Task): implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING) explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)
or globally by
luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING
for all bool parameters instantiated after this line.
- poll_time
Parameter whose value is an
int.
- dont_remove_tmp_dir
A Parameter whose value is a
bool. This parameter has an implicit default value ofFalse. For the command line interface this means that the value isFalseunless you add"--the-bool-parameter"to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to beTrue. This is called explicit parsing. When omitting the parameter value, it is still consideredTruebut to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.You can toggle between the two parsing modes on a per-parameter base via
class MyTask(luigi.Task): implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING) explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)
or globally by
luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING
for all bool parameters instantiated after this line.
- no_tarball
A Parameter whose value is a
bool. This parameter has an implicit default value ofFalse. For the command line interface this means that the value isFalseunless you add"--the-bool-parameter"to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to beTrue. This is called explicit parsing. When omitting the parameter value, it is still consideredTruebut to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.You can toggle between the two parsing modes on a per-parameter base via
class MyTask(luigi.Task): implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING) explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)
or globally by
luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING
for all bool parameters instantiated after this line.
- job_name
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See Parameters from config IngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.