# -*- coding: utf-8 -*-
#
# Copyright 2015 Outlier Bio, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
EC2 Container Service wrapper for Luigi
From the AWS website:
Amazon EC2 Container Service (ECS) is a highly scalable, high performance
container management service that supports Docker containers and allows you
to easily run applications on a managed cluster of Amazon EC2 instances.
To use ECS, you create a taskDefinition_ JSON that defines the `docker run`_
command for one or more containers in a task or service, and then submit this
JSON to the API to run the task.
This `boto3-powered`_ wrapper allows you to create Luigi Tasks to submit ECS
``taskDefinition`` s. You can either pass a dict (mapping directly to the
``taskDefinition`` JSON) OR an Amazon Resource Name (arn) for a previously
registered ``taskDefinition``.
Requires:
- boto3 package
- Amazon AWS credentials discoverable by boto3 (e.g., by using ``aws configure``
from awscli_)
- A running ECS cluster (see `ECS Get Started`_)
Written and maintained by Jake Feala (@jfeala) for Outlier Bio (@outlierbio)
.. _`docker run`: https://docs.docker.com/reference/commandline/run
.. _taskDefinition: http://docs.aws.amazon.com/AmazonECS/latest/developerguide/task_defintions.html
.. _`boto3-powered`: https://boto3.readthedocs.io
.. _awscli: https://aws.amazon.com/cli
.. _`ECS Get Started`: http://docs.aws.amazon.com/AmazonECS/latest/developerguide/ECS_GetStarted.html
"""
import copy
import time
import logging
import luigi
logger = logging.getLogger('luigi-interface')
try:
import boto3
client = boto3.client('ecs')
except ImportError:
logger.warning('boto3 is not installed. ECSTasks require boto3')
POLL_TIME = 2
def _get_task_statuses(task_ids, cluster):
"""
Retrieve task statuses from ECS API
Returns list of {RUNNING|PENDING|STOPPED} for each id in task_ids
"""
response = client.describe_tasks(tasks=task_ids, cluster=cluster)
# Error checking
if response['failures'] != []:
raise Exception('There were some failures:\n{0}'.format(
response['failures']))
status_code = response['ResponseMetadata']['HTTPStatusCode']
if status_code != 200:
msg = 'Task status request received status code {0}:\n{1}'
raise Exception(msg.format(status_code, response))
return [t['lastStatus'] for t in response['tasks']]
def _track_tasks(task_ids, cluster):
"""Poll task status until STOPPED"""
while True:
statuses = _get_task_statuses(task_ids, cluster)
if all([status == 'STOPPED' for status in statuses]):
logger.info('ECS tasks {0} STOPPED'.format(','.join(task_ids)))
break
time.sleep(POLL_TIME)
logger.debug('ECS task status for tasks {0}: {1}'.format(task_ids, statuses))
[docs]
class ECSTask(luigi.Task):
"""
Base class for an Amazon EC2 Container Service Task
Amazon ECS requires you to register "tasks", which are JSON descriptions
for how to issue the ``docker run`` command. This Luigi Task can either
run a pre-registered ECS taskDefinition, OR register the task on the fly
from a Python dict.
:param task_def_arn: pre-registered task definition ARN (Amazon Resource
Name), of the form::
arn:aws:ecs:<region>:<user_id>:task-definition/<family>:<tag>
:param task_def: dict describing task in taskDefinition JSON format, for
example::
task_def = {
'family': 'hello-world',
'volumes': [],
'containerDefinitions': [
{
'memory': 1,
'essential': True,
'name': 'hello-world',
'image': 'ubuntu',
'command': ['/bin/echo', 'hello world']
}
]
}
:param cluster: str defining the ECS cluster to use.
When this is not defined it will use the default one.
"""
task_def_arn = luigi.OptionalParameter(default=None)
task_def = luigi.OptionalParameter(default=None)
cluster = luigi.Parameter(default='default')
@property
def ecs_task_ids(self):
"""Expose the ECS task ID"""
if hasattr(self, '_task_ids'):
return self._task_ids
@property
def command(self):
"""
Command passed to the containers
Override to return list of dicts with keys 'name' and 'command',
describing the container names and commands to pass to the container.
These values will be specified in the `containerOverrides` property of
the `overrides` parameter passed to the runTask API.
Example::
[
{
'name': 'myContainer',
'command': ['/bin/sleep', '60']
}
]
"""
pass
[docs]
@staticmethod
def update_container_overrides_command(container_overrides, command):
"""
Update a list of container overrides with the specified command.
The specified command will take precedence over any existing commands
in `container_overrides` for the same container name. If no existing
command yet exists in `container_overrides` for the specified command,
it will be added.
"""
for colliding_override in filter(lambda x: x['name'] == command['name'], container_overrides):
colliding_override['command'] = command['command']
break
else:
container_overrides.append(command)
@property
def combined_overrides(self):
"""
Return single dict combining any provided `overrides` parameters.
This is used to allow custom `overrides` parameters to be specified in
`self.run_task_kwargs` while ensuring that the values specified in
`self.command` are honored in `containerOverrides`.
"""
overrides = copy.deepcopy(self.run_task_kwargs.get('overrides', {}))
if self.command:
if 'containerOverrides' in overrides:
for command in self.command:
self.update_container_overrides_command(overrides['containerOverrides'], command)
else:
overrides['containerOverrides'] = self.command
return overrides
@property
def run_task_kwargs(self):
"""
Additional keyword arguments to be provided to ECS runTask API.
Override this property in a subclass to provide additional parameters
such as `network_configuration`, `launchType`, etc.
If the returned `dict` includes an `overrides` value with a nested
`containerOverrides` array defining one or more container `command`
values, prior to calling `run_task` they will be combined with and
superseded by any colliding values specified separately in the
`command` property.
Example::
{
'launchType': 'FARGATE',
'platformVersion': '1.4.0',
'networkConfiguration': {
'awsvpcConfiguration': {
'subnets': [
'subnet-01234567890abcdef',
'subnet-abcdef01234567890'
],
'securityGroups': [
'sg-abcdef01234567890',
],
'assignPublicIp': 'ENABLED'
}
},
'overrides': {
'ephemeralStorage': {
'sizeInGiB': 30
}
}
}
"""
return {}
[docs]
def run(self):
if (not self.task_def and not self.task_def_arn) or \
(self.task_def and self.task_def_arn):
raise ValueError(('Either (but not both) a task_def (dict) or'
'task_def_arn (string) must be assigned'))
if not self.task_def_arn:
# Register the task and get assigned taskDefinition ID (arn)
response = client.register_task_definition(**self.task_def)
self.task_def_arn = response['taskDefinition']['taskDefinitionArn']
run_task_kwargs = self.run_task_kwargs
run_task_kwargs.update({
'taskDefinition': self.task_def_arn,
'cluster': self.cluster,
'overrides': self.combined_overrides,
})
# Submit the task to AWS ECS and get assigned task ID
# (list containing 1 string)
response = client.run_task(**run_task_kwargs)
if response['failures']:
raise Exception(", ".join(["fail to run task {0} reason: {1}".format(failure['arn'], failure['reason'])
for failure in response['failures']]))
self._task_ids = [task['taskArn'] for task in response['tasks']]
# Wait on task completion
_track_tasks(self._task_ids, self.cluster)