luigi.contrib.sge module¶
SGE batch system Tasks.
Adapted by Jake Feala (@jfeala) from LSF extension by Alex Wiltschko (@alexbw) Maintained by Jake Feala (@jfeala)
SunGrid Engine is a job scheduler used to allocate compute resources on a
shared cluster. Jobs are submitted using the qsub
command and monitored
using qstat
. To get started, install luigi on all nodes.
To run luigi workflows on an SGE cluster, subclass
luigi.contrib.sge.SGEJobTask
as you would any luigi.Task
,
but override the work()
method, instead of run()
, to define the job
code. Then, run your Luigi workflow from the master node, assigning > 1
workers
in order to distribute the tasks in parallel across the cluster.
The following is an example usage (and can also be found in sge_tests.py
)
import logging
import luigi
import os
from luigi.contrib.sge import SGEJobTask
logger = logging.getLogger('luigi-interface')
class TestJobTask(SGEJobTask):
i = luigi.Parameter()
def work(self):
logger.info('Running test job...')
with open(self.output().path, 'w') as f:
f.write('this is a test')
def output(self):
return luigi.LocalTarget(os.path.join('/home', 'testfile_' + str(self.i)))
if __name__ == '__main__':
tasks = [TestJobTask(i=str(i), n_cpu=i+1) for i in range(3)]
luigi.build(tasks, local_scheduler=True, workers=3)
The n-cpu
parameter allows you to define different compute resource
requirements (or slots, in SGE terms) for each task. In this example, the
third Task asks for 3 CPU slots. If your cluster only contains nodes with
2 CPUs, this task will hang indefinitely in the queue. See the docs for
luigi.contrib.sge.SGEJobTask
for other SGE parameters. As for any
task, you can also set these in your luigi configuration file as shown below.
The default values below were matched to the values used by MIT StarCluster,
an open-source SGE cluster manager for use with Amazon EC2:
[SGEJobTask]
shared-tmp-dir = /home
parallel-env = orte
n-cpu = 2
-
class
luigi.contrib.sge.
SGEJobTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
Base class for executing a job on SunGrid Engine
Override
work()
(rather thanrun()
) with your job code.Parameters:
- n_cpu: Number of CPUs (or “slots”) to allocate for the Task. This
- value is passed as
qsub -pe {pe} {n_cpu}
- parallel_env: SGE parallel environment name. The default is “orte”,
- the parallel environment installed with MIT StarCluster. If you are using a different cluster environment, check with your sysadmin for the right pe to use. This value is passed as {pe} to the qsub command above.
- shared_tmp_dir: Shared drive accessible from all nodes in the cluster.
- Task classes and dependencies are pickled to a temporary folder on
this drive. The default is
/home
, the NFS share location setup by StarCluster
- job_name_format: String that can be passed in to customize the job name
- string passed to qsub; e.g. “Task123_{task_family}_{n_cpu}…”.
- job_name: Exact job name to pass to qsub.
- run_locally: Run locally instead of on the cluster.
- poll_time: the length of time to wait in order to poll qstat
- dont_remove_tmp_dir: Instead of deleting the temporary directory, keep it.
- no_tarball: Don’t create a tarball of the luigi project directory. Can be
- useful to reduce I/O requirements when the luigi directory is accessible from cluster nodes already.
-
n_cpu
= Insignificant IntParameter (defaults to 2)¶
-
parallel_env
= Insignificant Parameter (defaults to orte)¶
-
job_name_format
= Insignificant Parameter (defaults to None): A string that can be formatted with class variables to name the job with qsub.¶
-
run_locally
= Insignificant BoolParameter (defaults to False): run locally instead of on the cluster¶
-
poll_time
= Insignificant IntParameter (defaults to 5): specify the wait time to poll qstat for the job status¶
-
dont_remove_tmp_dir
= Insignificant BoolParameter (defaults to False): don't delete the temporary directory used (for debugging)¶
-
no_tarball
= Insignificant BoolParameter (defaults to False): don't tarball (and extract) the luigi project files¶
-
job_name
= Insignificant Parameter (defaults to None): Explicit job name given via qsub.¶
-
class
luigi.contrib.sge.
LocalSGEJobTask
(*args, **kwargs)[source]¶ Bases:
luigi.contrib.sge.SGEJobTask
A local version of SGEJobTask, for easier debugging.
This version skips the
qsub
steps and simply runswork()
on the local node, so you don’t need to be on an SGE cluster to use your Task in a test workflow.