# -*- coding: utf-8 -*-
#
# Copyright 2017 Open Targets
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
MicroSoft OpenPAI Job wrapper for Luigi.
"OpenPAI is an open source platform that provides complete AI model training and resource management capabilities,
it is easy to extend and supports on-premise, cloud and hybrid environments in various scale."
For more information about OpenPAI : https://github.com/Microsoft/pai/, this task is tested against OpenPAI 0.7.1
Requires:
- requests: ``pip install requests``
Written and maintained by Liu, Dongqing (@liudongqing).
"""
import abc
import json
import logging
import time
from urllib.parse import urljoin
import luigi
logger = logging.getLogger("luigi-interface")
try:
import requests as rs
from requests.exceptions import HTTPError
except ImportError:
logger.warning("requests is not installed. PaiTask requires requests.")
[docs]
def slot_to_dict(o):
o_dict = {}
for key in o.__slots__:
if not key.startswith("__"):
value = getattr(o, key, None)
if value is not None:
o_dict[key] = value
return o_dict
[docs]
class PaiJob:
"""
The Open PAI job definition.
Refer to here https://github.com/Microsoft/pai/blob/master/docs/job_tutorial.md
::
{
"jobName": String,
"image": String,
"authFile": String,
"dataDir": String,
"outputDir": String,
"codeDir": String,
"virtualCluster": String,
"taskRoles": [
{
"name": String,
"taskNumber": Integer,
"cpuNumber": Integer,
"memoryMB": Integer,
"shmMB": Integer,
"gpuNumber": Integer,
"portList": [
{
"label": String,
"beginAt": Integer,
"portNumber": Integer
}
],
"command": String,
"minFailedTaskCount": Integer,
"minSucceededTaskCount": Integer
}
],
"gpuType": String,
"retryCount": Integer
}
"""
__slots__ = ("jobName", "image", "authFile", "dataDir", "outputDir", "codeDir", "virtualCluster", "taskRoles", "gpuType", "retryCount")
def __init__(self, jobName, image, tasks):
"""
Initialize a Job with required fields.
:param jobName: Name for the job, need to be unique
:param image: URL pointing to the Docker image for all tasks in the job
:param tasks: List of taskRole, one task role at least
"""
self.jobName = jobName
self.image = image
if isinstance(tasks, list) and len(tasks) != 0:
self.taskRoles = tasks
else:
raise TypeError("you must specify one task at least.")
[docs]
class Port:
__slots__ = ("label", "beginAt", "portNumber")
def __init__(self, label, begin_at=0, port_number=1):
"""
The Port definition for TaskRole
:param label: Label name for the port type, required
:param begin_at: The port to begin with in the port type, 0 for random selection, required
:param port_number: Number of ports for the specific type, required
"""
self.label = label
self.beginAt = begin_at
self.portNumber = port_number
[docs]
class TaskRole:
__slots__ = ("name", "taskNumber", "cpuNumber", "memoryMB", "shmMB", "gpuNumber", "portList", "command", "minFailedTaskCount", "minSucceededTaskCount")
def __init__(self, name, command, taskNumber=1, cpuNumber=1, memoryMB=2048, shmMB=64, gpuNumber=0, portList=[]):
"""
The TaskRole of PAI
:param name: Name for the task role, need to be unique with other roles, required
:param command: Executable command for tasks in the task role, can not be empty, required
:param taskNumber: Number of tasks for the task role, no less than 1, required
:param cpuNumber: CPU number for one task in the task role, no less than 1, required
:param shmMB: Shared memory for one task in the task role, no more than memory size, required
:param memoryMB: Memory for one task in the task role, no less than 100, required
:param gpuNumber: GPU number for one task in the task role, no less than 0, required
:param portList: List of portType to use, optional
"""
self.name = name
self.command = command
self.taskNumber = taskNumber
self.cpuNumber = cpuNumber
self.memoryMB = memoryMB
self.shmMB = shmMB
self.gpuNumber = gpuNumber
self.portList = portList
[docs]
class OpenPai(luigi.Config):
pai_url = luigi.Parameter(default="http://127.0.0.1:9186", description="rest server url, default is http://127.0.0.1:9186")
username = luigi.Parameter(default="admin", description="your username")
password = luigi.Parameter(default=None, description="your password")
expiration = luigi.IntParameter(default=3600, description="expiration time in seconds")
[docs]
class PaiTask(luigi.Task):
__POLL_TIME = 5
@property
@abc.abstractmethod
def name(self):
"""Name for the job, need to be unique, required"""
return "SklearnExample"
@property
@abc.abstractmethod
def image(self):
"""URL pointing to the Docker image for all tasks in the job, required"""
return "openpai/pai.example.sklearn"
@property
@abc.abstractmethod
def tasks(self):
"""List of taskRole, one task role at least, required"""
return []
@property
def auth_file_path(self):
"""Docker registry authentication file existing on HDFS, optional"""
return None
@property
def data_dir(self):
"""Data directory existing on HDFS, optional"""
return None
@property
def code_dir(self):
"""Code directory existing on HDFS, should not contain any data and should be less than 200MB, optional"""
return None
@property
def output_dir(self):
"""Output directory on HDFS, $PAI_DEFAULT_FS_URI/$jobName/output will be used if not specified, optional"""
return "$PAI_DEFAULT_FS_URI/{0}/output".format(self.name)
@property
def virtual_cluster(self):
"""The virtual cluster job runs on. If omitted, the job will run on default virtual cluster, optional"""
return "default"
@property
def gpu_type(self):
"""Specify the GPU type to be used in the tasks. If omitted, the job will run on any gpu type, optional"""
return None
@property
def retry_count(self):
"""Job retry count, no less than 0, optional"""
return 0
def __init_token(self):
self.__openpai = OpenPai()
request_json = json.dumps({"username": self.__openpai.username, "password": self.__openpai.password, "expiration": self.__openpai.expiration})
logger.debug("Requesting token from OpenPai")
response = rs.post(urljoin(self.__openpai.pai_url, "/api/v1/token"), headers={"Content-Type": "application/json"}, data=request_json)
logger.debug("Get token response {0}".format(response.text))
if response.status_code != 200:
msg = "Get token request failed, response is {}".format(response.text)
logger.error(msg)
raise Exception(msg)
else:
self.__token = response.json()["token"]
def __init__(self, *args, **kwargs):
"""
:param pai_url: The rest server url of PAI clusters, default is 'http://127.0.0.1:9186'.
:param token: The token used to auth the rest server of PAI.
"""
super(PaiTask, self).__init__(*args, **kwargs)
self.__init_token()
def __check_job_status(self):
response = rs.get(urljoin(self.__openpai.pai_url, "/api/v1/jobs/{0}".format(self.name)))
logger.debug("Check job response {0}".format(response.text))
if response.status_code == 404:
msg = "Job {0} is not found".format(self.name)
logger.debug(msg)
raise HTTPError(msg, response=response)
elif response.status_code != 200:
msg = "Get job request failed, response is {}".format(response.text)
logger.error(msg)
raise HTTPError(msg, response=response)
job_state = response.json()["jobStatus"]["state"]
if job_state in ["UNKNOWN", "WAITING", "RUNNING"]:
logger.debug("Job {0} is running in state {1}".format(self.name, job_state))
return False
else:
msg = "Job {0} finished in state {1}".format(self.name, job_state)
logger.info(msg)
if job_state == "SUCCEED":
return True
else:
raise RuntimeError(msg)
[docs]
def run(self):
job = PaiJob(self.name, self.image, self.tasks)
job.virtualCluster = self.virtual_cluster
job.authFile = self.auth_file_path
job.codeDir = self.code_dir
job.dataDir = self.data_dir
job.outputDir = self.output_dir
job.retryCount = self.retry_count
job.gpuType = self.gpu_type
request_json = json.dumps(job, default=slot_to_dict)
logger.debug("Submit job request {0}".format(request_json))
response = rs.post(
urljoin(self.__openpai.pai_url, "/api/v1/jobs"),
headers={"Content-Type": "application/json", "Authorization": "Bearer {}".format(self.__token)},
data=request_json,
)
logger.debug("Submit job response {0}".format(response.text))
# 202 is success for job submission, see https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md
if response.status_code != 202:
msg = "Submit job failed, response code is {0}, body is {1}".format(response.status_code, response.text)
logger.error(msg)
raise HTTPError(msg, response=response)
while not self.__check_job_status():
time.sleep(self.__POLL_TIME)
[docs]
def output(self):
return luigi.contrib.hdfs.HdfsTarget(self.output())
[docs]
def complete(self):
try:
return self.__check_job_status()
except HTTPError:
return False
except RuntimeError:
return False