luigi.contrib.sge module

SGE batch system Tasks.

Adapted by Jake Feala (@jfeala) from LSF extension by Alex Wiltschko (@alexbw) Maintained by Jake Feala (@jfeala)

SunGrid Engine is a job scheduler used to allocate compute resources on a shared cluster. Jobs are submitted using the qsub command and monitored using qstat. To get started, install luigi on all nodes.

To run luigi workflows on an SGE cluster, subclass luigi.contrib.sge.SGEJobTask as you would any luigi.Task, but override the work() method, instead of run(), to define the job code. Then, run your Luigi workflow from the master node, assigning > 1 workers in order to distribute the tasks in parallel across the cluster.

The following is an example usage (and can also be found in sge_tests.py)

import logging
import luigi
import os
from luigi.contrib.sge import SGEJobTask

logger = logging.getLogger('luigi-interface')


class TestJobTask(SGEJobTask):

    i = luigi.Parameter()

    def work(self):
        logger.info('Running test job...')
        with open(self.output().path, 'w') as f:
            f.write('this is a test')

    def output(self):
        return luigi.LocalTarget(os.path.join('/home', 'testfile_' + str(self.i)))


if __name__ == '__main__':
    tasks = [TestJobTask(i=str(i), n_cpu=i+1) for i in range(3)]
    luigi.build(tasks, local_scheduler=True, workers=3)

The n-cpu parameter allows you to define different compute resource requirements (or slots, in SGE terms) for each task. In this example, the third Task asks for 3 CPU slots. If your cluster only contains nodes with 2 CPUs, this task will hang indefinitely in the queue. See the docs for luigi.contrib.sge.SGEJobTask for other SGE parameters. As for any task, you can also set these in your luigi configuration file as shown below. The default values below were matched to the values used by MIT StarCluster, an open-source SGE cluster manager for use with Amazon EC2:

[SGEJobTask]
shared-tmp-dir = /home
parallel-env = orte
n-cpu = 2
class luigi.contrib.sge.SGEJobTask(*args, **kwargs)[source]

Bases: Task

Base class for executing a job on SunGrid Engine

Override work() (rather than run()) with your job code.

Parameters:

  • n_cpu: Number of CPUs (or “slots”) to allocate for the Task. This

    value is passed as qsub -pe {pe} {n_cpu}

  • parallel_env: SGE parallel environment name. The default is “orte”,

    the parallel environment installed with MIT StarCluster. If you are using a different cluster environment, check with your sysadmin for the right pe to use. This value is passed as {pe} to the qsub command above.

  • shared_tmp_dir: Shared drive accessible from all nodes in the cluster.

    Task classes and dependencies are pickled to a temporary folder on this drive. The default is /home, the NFS share location setup by StarCluster

  • job_name_format: String that can be passed in to customize the job name

    string passed to qsub; e.g. “Task123_{task_family}_{n_cpu}…”.

  • job_name: Exact job name to pass to qsub.

  • run_locally: Run locally instead of on the cluster.

  • poll_time: the length of time to wait in order to poll qstat

  • dont_remove_tmp_dir: Instead of deleting the temporary directory, keep it.

  • no_tarball: Don’t create a tarball of the luigi project directory. Can be

    useful to reduce I/O requirements when the luigi directory is accessible from cluster nodes already.

n_cpu = Insignificant IntParameter (defaults to 2)
shared_tmp_dir = Insignificant Parameter (defaults to /home)
parallel_env = Insignificant Parameter (defaults to orte)
job_name_format = Insignificant Parameter (defaults to None): A string that can be formatted with class variables to name the job with qsub.
run_locally = Insignificant BoolParameter (defaults to False): run locally instead of on the cluster
poll_time = Insignificant IntParameter (defaults to 5): specify the wait time to poll qstat for the job status
dont_remove_tmp_dir = Insignificant BoolParameter (defaults to False): don't delete the temporary directory used (for debugging)
no_tarball = Insignificant BoolParameter (defaults to False): don't tarball (and extract) the luigi project files
job_name = Insignificant Parameter (defaults to None): Explicit job name given via qsub.
run()[source]

The task run method, to be overridden in a subclass.

See Task.run

work()[source]

Override this method, rather than run(), for your actual work.

class luigi.contrib.sge.LocalSGEJobTask(*args, **kwargs)[source]

Bases: SGEJobTask

A local version of SGEJobTask, for easier debugging.

This version skips the qsub steps and simply runs work() on the local node, so you don’t need to be on an SGE cluster to use your Task in a test workflow.

run()[source]

The task run method, to be overridden in a subclass.

See Task.run