luigi.contrib.pai module

MicroSoft OpenPAI Job wrapper for Luigi.

“OpenPAI is an open source platform that provides complete AI model training and resource management capabilities, it is easy to extend and supports on-premise, cloud and hybrid environments in various scale.”

For more information about OpenPAI : https://github.com/Microsoft/pai/, this task is tested against OpenPAI 0.7.1

Requires:

  • requests: pip install requests

Written and maintained by Liu, Dongqing (@liudongqing).

luigi.contrib.pai.slot_to_dict(o)[source]
class luigi.contrib.pai.PaiJob(jobName, image, tasks)[source]

Bases: object

The Open PAI job definition. Refer to here https://github.com/Microsoft/pai/blob/master/docs/job_tutorial.md

{
  "jobName":   String,
  "image":     String,
  "authFile":  String,
  "dataDir":   String,
  "outputDir": String,
  "codeDir":   String,
  "virtualCluster": String,
  "taskRoles": [
    {
      "name":       String,
      "taskNumber": Integer,
      "cpuNumber":  Integer,
      "memoryMB":   Integer,
      "shmMB":      Integer,
      "gpuNumber":  Integer,
      "portList": [
        {
          "label": String,
          "beginAt": Integer,
          "portNumber": Integer
        }
      ],
      "command":    String,
      "minFailedTaskCount": Integer,
      "minSucceededTaskCount": Integer
    }
  ],
  "gpuType": String,
  "retryCount": Integer
}

Initialize a Job with required fields.

Parameters:
  • jobName – Name for the job, need to be unique
  • image – URL pointing to the Docker image for all tasks in the job
  • tasks – List of taskRole, one task role at least
jobName
image
taskRoles
authFile
codeDir
dataDir
gpuType
outputDir
retryCount
virtualCluster
class luigi.contrib.pai.Port(label, begin_at=0, port_number=1)[source]

Bases: object

The Port definition for TaskRole

Parameters:
  • label – Label name for the port type, required
  • begin_at – The port to begin with in the port type, 0 for random selection, required
  • port_number – Number of ports for the specific type, required
label
beginAt
portNumber
class luigi.contrib.pai.TaskRole(name, command, taskNumber=1, cpuNumber=1, memoryMB=2048, shmMB=64, gpuNumber=0, portList=[])[source]

Bases: object

The TaskRole of PAI

Parameters:
  • name – Name for the task role, need to be unique with other roles, required
  • command – Executable command for tasks in the task role, can not be empty, required
  • taskNumber – Number of tasks for the task role, no less than 1, required
  • cpuNumber – CPU number for one task in the task role, no less than 1, required
  • shmMB – Shared memory for one task in the task role, no more than memory size, required
  • memoryMB – Memory for one task in the task role, no less than 100, required
  • gpuNumber – GPU number for one task in the task role, no less than 0, required
  • portList – List of portType to use, optional
name
command
taskNumber
cpuNumber
memoryMB
shmMB
gpuNumber
portList
minFailedTaskCount
minSucceededTaskCount
class luigi.contrib.pai.OpenPai(*args, **kwargs)[source]

Bases: luigi.task.Config

pai_url = Parameter (defaults to http://127.0.0.1:9186): rest server url, default is http://127.0.0.1:9186
username = Parameter (defaults to admin): your username
password = Parameter (defaults to None): your password
expiration = IntParameter (defaults to 3600): expiration time in seconds
class luigi.contrib.pai.PaiTask(*args, **kwargs)[source]

Bases: luigi.task.Task

Parameters:
  • pai_url – The rest server url of PAI clusters, default is ‘http://127.0.0.1:9186’.
  • token – The token used to auth the rest server of PAI.
name

Name for the job, need to be unique, required

image

URL pointing to the Docker image for all tasks in the job, required

tasks

List of taskRole, one task role at least, required

auth_file_path

Docker registry authentication file existing on HDFS, optional

data_dir

Data directory existing on HDFS, optional

code_dir

Code directory existing on HDFS, should not contain any data and should be less than 200MB, optional

output_dir

Output directory on HDFS, $PAI_DEFAULT_FS_URI/$jobName/output will be used if not specified, optional

virtual_cluster

The virtual cluster job runs on. If omitted, the job will run on default virtual cluster, optional

gpu_type

Specify the GPU type to be used in the tasks. If omitted, the job will run on any gpu type, optional

retry_count

Job retry count, no less than 0, optional

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

output()[source]

The output that this Task produces.

The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single Target or a list of Target instances.

Implementation note
If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.

See Task.output

complete()[source]

If the task has any outputs, return True if all outputs exist. Otherwise, return False.

However, you may freely override this method with custom logic.