Source code for luigi.contrib.ecs

# -*- coding: utf-8 -*-
# Copyright 2015 Outlier Bio, LLC
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

EC2 Container Service wrapper for Luigi

From the AWS website:

  Amazon EC2 Container Service (ECS) is a highly scalable, high performance
  container management service that supports Docker containers and allows you
  to easily run applications on a managed cluster of Amazon EC2 instances.

To use ECS, you create a taskDefinition_ JSON that defines the `docker run`_
command for one or more containers in a task or service, and then submit this
JSON to the API to run the task.

This `boto3-powered`_ wrapper allows you to create Luigi Tasks to submit ECS
``taskDefinition`` s. You can either pass a dict (mapping directly to the
``taskDefinition`` JSON) OR an Amazon Resource Name (arn) for a previously
registered ``taskDefinition``.


- boto3 package
- Amazon AWS credentials discoverable by boto3 (e.g., by using ``aws configure``
  from awscli_)
- A running ECS cluster (see `ECS Get Started`_)

Written and maintained by Jake Feala (@jfeala) for Outlier Bio (@outlierbio)

.. _`docker run`:
.. _taskDefinition:
.. _`boto3-powered`:
.. _awscli:
.. _`ECS Get Started`:


import time
import logging
import luigi

logger = logging.getLogger('luigi-interface')

    import boto3
    client = boto3.client('ecs')
except ImportError:
    logger.warning('boto3 is not installed. ECSTasks require boto3')


def _get_task_statuses(task_ids, cluster):
    Retrieve task statuses from ECS API

    Returns list of {RUNNING|PENDING|STOPPED} for each id in task_ids
    response = client.describe_tasks(tasks=task_ids, cluster=cluster)

    # Error checking
    if response['failures'] != []:
        raise Exception('There were some failures:\n{0}'.format(
    status_code = response['ResponseMetadata']['HTTPStatusCode']
    if status_code != 200:
        msg = 'Task status request received status code {0}:\n{1}'
        raise Exception(msg.format(status_code, response))

    return [t['lastStatus'] for t in response['tasks']]

def _track_tasks(task_ids, cluster):
    """Poll task status until STOPPED"""
    while True:
        statuses = _get_task_statuses(task_ids, cluster)
        if all([status == 'STOPPED' for status in statuses]):
  'ECS tasks {0} STOPPED'.format(','.join(task_ids)))
        logger.debug('ECS task status for tasks {0}: {1}'.format(task_ids, statuses))

[docs]class ECSTask(luigi.Task): """ Base class for an Amazon EC2 Container Service Task Amazon ECS requires you to register "tasks", which are JSON descriptions for how to issue the ``docker run`` command. This Luigi Task can either run a pre-registered ECS taskDefinition, OR register the task on the fly from a Python dict. :param task_def_arn: pre-registered task definition ARN (Amazon Resource Name), of the form:: arn:aws:ecs:<region>:<user_id>:task-definition/<family>:<tag> :param task_def: dict describing task in taskDefinition JSON format, for example:: task_def = { 'family': 'hello-world', 'volumes': [], 'containerDefinitions': [ { 'memory': 1, 'essential': True, 'name': 'hello-world', 'image': 'ubuntu', 'command': ['/bin/echo', 'hello world'] } ] } :param cluster: str defining the ECS cluster to use. When this is not defined it will use the default one. """ task_def_arn = luigi.OptionalParameter(default=None) task_def = luigi.OptionalParameter(default=None) cluster = luigi.Parameter(default='default') @property def ecs_task_ids(self): """Expose the ECS task ID""" if hasattr(self, '_task_ids'): return self._task_ids @property def command(self): """ Command passed to the containers Override to return list of dicts with keys 'name' and 'command', describing the container names and commands to pass to the container. Directly corresponds to the `overrides` parameter of runTask API. For example:: [ { 'name': 'myContainer', 'command': ['/bin/sleep', '60'] } ] """ pass
[docs] def run(self): if (not self.task_def and not self.task_def_arn) or \ (self.task_def and self.task_def_arn): raise ValueError(('Either (but not both) a task_def (dict) or' 'task_def_arn (string) must be assigned')) if not self.task_def_arn: # Register the task and get assigned taskDefinition ID (arn) response = client.register_task_definition(**self.task_def) self.task_def_arn = response['taskDefinition']['taskDefinitionArn'] # Submit the task to AWS ECS and get assigned task ID # (list containing 1 string) if self.command: overrides = {'containerOverrides': self.command} else: overrides = {} response = client.run_task(taskDefinition=self.task_def_arn, overrides=overrides, cluster=self.cluster) if response['failures']: raise Exception(", ".join(["fail to run task {0} reason: {1}".format(failure['arn'], failure['reason']) for failure in response['failures']])) self._task_ids = [task['taskArn'] for task in response['tasks']] # Wait on task completion _track_tasks(self._task_ids, self.cluster)