luigi.contrib.batch

AWS Batch wrapper for Luigi

From the AWS website:

AWS Batch enables you to run batch computing workloads on the AWS Cloud.

Batch computing is a common way for developers, scientists, and engineers to access large amounts of compute resources, and AWS Batch removes the undifferentiated heavy lifting of configuring and managing the required infrastructure. AWS Batch is similar to traditional batch computing software. This service can efficiently provision resources in response to jobs submitted in order to eliminate capacity constraints, reduce compute costs, and deliver results quickly.

See AWS Batch User Guide for more details.

To use AWS Batch, you create a jobDefinition JSON that defines a docker run command, and then submit this JSON to the API to queue up the task. Behind the scenes, AWS Batch auto-scales a fleet of EC2 Container Service instances, monitors the load on these instances, and schedules the jobs.

This boto3-powered wrapper allows you to create Luigi Tasks to submit Batch jobDefinition``s. You can either pass a dict (mapping directly to the ``jobDefinition JSON) OR an Amazon Resource Name (arn) for a previously registered jobDefinition.

Requires:

  • boto3 package

  • Amazon AWS credentials discoverable by boto3 (e.g., by using aws configure from awscli)

  • An enabled AWS Batch job queue configured to run on a compute environment.

Written and maintained by Jake Feala (@jfeala) for Outlier Bio (@outlierbio)

Classes

BatchClient([poll_time])

BatchTask(*args, **kwargs)

Base class for an Amazon Batch job

Exceptions

BatchJobException

exception luigi.contrib.batch.BatchJobException[source]
class luigi.contrib.batch.BatchClient(poll_time=10)[source]
get_active_queue()[source]

Get name of first active job queue

get_job_id_from_name(job_name)[source]

Retrieve the first job ID matching the given name

get_job_status(job_id)[source]

Retrieve task statuses from ECS API

Parameters:

(str) (job_id) – AWS Batch job uuid

Returns one of {SUBMITTED|PENDING|RUNNABLE|STARTING|RUNNING|SUCCEEDED|FAILED}

get_logs(log_stream_name, get_last=50)[source]

Retrieve log stream from CloudWatch

submit_job(job_definition, parameters, job_name=None, queue=None)[source]

Wrap submit_job with useful defaults

wait_on_job(job_id)[source]

Poll task status until STOPPED

register_job_definition(json_fpath)[source]

Register a job definition with AWS Batch, using a JSON

class luigi.contrib.batch.BatchTask(*args, **kwargs)[source]

Base class for an Amazon Batch job

Amazon Batch requires you to register “job definitions”, which are JSON descriptions for how to issue the docker run command. This Luigi Task requires a pre-registered Batch jobDefinition name passed as a Parameter

Parameters:
  • (str) (job_definition) – name of pre-registered jobDefinition

  • job_name – name of specific job, for tracking in the queue and logs.

  • job_queue – name of job queue where job is going to be submitted.

job_definition

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See Parameters from config Ingestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

job_name

Class to parse optional parameters.

job_queue

Class to parse optional parameters.

poll_time

Parameter whose value is an int.

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

property parameters

Override to return a dict of parameters for the Batch Task