luigi.contrib.sge

SGE batch system Tasks.

Adapted by Jake Feala (@jfeala) from LSF extension by Alex Wiltschko (@alexbw) Maintained by Jake Feala (@jfeala)

SunGrid Engine is a job scheduler used to allocate compute resources on a shared cluster. Jobs are submitted using the qsub command and monitored using qstat. To get started, install luigi on all nodes.

To run luigi workflows on an SGE cluster, subclass luigi.contrib.sge.SGEJobTask as you would any luigi.Task, but override the work() method, instead of run(), to define the job code. Then, run your Luigi workflow from the master node, assigning > 1 workers in order to distribute the tasks in parallel across the cluster.

The following is an example usage (and can also be found in sge_tests.py)

import logging
import luigi
import os
from luigi.contrib.sge import SGEJobTask

logger = logging.getLogger('luigi-interface')


class TestJobTask(SGEJobTask):

    i = luigi.Parameter()

    def work(self):
        logger.info('Running test job...')
        with open(self.output().path, 'w') as f:
            f.write('this is a test')

    def output(self):
        return luigi.LocalTarget(os.path.join('/home', 'testfile_' + str(self.i)))


if __name__ == '__main__':
    tasks = [TestJobTask(i=str(i), n_cpu=i+1) for i in range(3)]
    luigi.build(tasks, local_scheduler=True, workers=3)

The n-cpu parameter allows you to define different compute resource requirements (or slots, in SGE terms) for each task. In this example, the third Task asks for 3 CPU slots. If your cluster only contains nodes with 2 CPUs, this task will hang indefinitely in the queue. See the docs for luigi.contrib.sge.SGEJobTask for other SGE parameters. As for any task, you can also set these in your luigi configuration file as shown below. The default values below were matched to the values used by MIT StarCluster, an open-source SGE cluster manager for use with Amazon EC2:

[SGEJobTask]
shared-tmp-dir = /home
parallel-env = orte
n-cpu = 2

Classes

LocalSGEJobTask(*args, **kwargs)

A local version of SGEJobTask, for easier debugging.

SGEJobTask(*args, **kwargs)

Base class for executing a job on SunGrid Engine

class luigi.contrib.sge.SGEJobTask(*args, **kwargs)[source]

Base class for executing a job on SunGrid Engine

Override work() (rather than run()) with your job code.

Parameters:

  • n_cpu: Number of CPUs (or “slots”) to allocate for the Task. This

    value is passed as qsub -pe {pe} {n_cpu}

  • parallel_env: SGE parallel environment name. The default is “orte”,

    the parallel environment installed with MIT StarCluster. If you are using a different cluster environment, check with your sysadmin for the right pe to use. This value is passed as {pe} to the qsub command above.

  • shared_tmp_dir: Shared drive accessible from all nodes in the cluster.

    Task classes and dependencies are pickled to a temporary folder on this drive. The default is /home, the NFS share location setup by StarCluster

  • job_name_format: String that can be passed in to customize the job name

    string passed to qsub; e.g. “Task123_{task_family}_{n_cpu}…”.

  • job_name: Exact job name to pass to qsub.

  • run_locally: Run locally instead of on the cluster.

  • poll_time: the length of time to wait in order to poll qstat

  • dont_remove_tmp_dir: Instead of deleting the temporary directory, keep it.

  • no_tarball: Don’t create a tarball of the luigi project directory. Can be

    useful to reduce I/O requirements when the luigi directory is accessible from cluster nodes already.

n_cpu

Parameter whose value is an int.

shared_tmp_dir

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See Parameters from config Ingestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

parallel_env

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See Parameters from config Ingestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

job_name_format

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See Parameters from config Ingestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

run_locally

A Parameter whose value is a bool. This parameter has an implicit default value of False. For the command line interface this means that the value is False unless you add "--the-bool-parameter" to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to be True. This is called explicit parsing. When omitting the parameter value, it is still considered True but to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.

You can toggle between the two parsing modes on a per-parameter base via

class MyTask(luigi.Task):
    implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING)
    explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)

or globally by

luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING

for all bool parameters instantiated after this line.

poll_time

Parameter whose value is an int.

dont_remove_tmp_dir

A Parameter whose value is a bool. This parameter has an implicit default value of False. For the command line interface this means that the value is False unless you add "--the-bool-parameter" to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to be True. This is called explicit parsing. When omitting the parameter value, it is still considered True but to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.

You can toggle between the two parsing modes on a per-parameter base via

class MyTask(luigi.Task):
    implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING)
    explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)

or globally by

luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING

for all bool parameters instantiated after this line.

no_tarball

A Parameter whose value is a bool. This parameter has an implicit default value of False. For the command line interface this means that the value is False unless you add "--the-bool-parameter" to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to be True. This is called explicit parsing. When omitting the parameter value, it is still considered True but to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.

You can toggle between the two parsing modes on a per-parameter base via

class MyTask(luigi.Task):
    implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING)
    explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)

or globally by

luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING

for all bool parameters instantiated after this line.

job_name

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See Parameters from config Ingestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

work()[source]

Override this method, rather than run(), for your actual work.

class luigi.contrib.sge.LocalSGEJobTask(*args, **kwargs)[source]

A local version of SGEJobTask, for easier debugging.

This version skips the qsub steps and simply runs work() on the local node, so you don’t need to be on an SGE cluster to use your Task in a test workflow.

run()[source]

The task run method, to be overridden in a subclass.

See Task.run