luigi.contrib.pai
MicroSoft OpenPAI Job wrapper for Luigi.
“OpenPAI is an open source platform that provides complete AI model training and resource management capabilities, it is easy to extend and supports on-premise, cloud and hybrid environments in various scale.”
For more information about OpenPAI : https://github.com/Microsoft/pai/, this task is tested against OpenPAI 0.7.1
Requires:
requests:
pip install requests
Written and maintained by Liu, Dongqing (@liudongqing).
Functions
|
Classes
|
|
|
The Open PAI job definition. Refer to here https://github.com/Microsoft/pai/blob/master/docs/job_tutorial.md ::. |
|
|
|
The Port definition for TaskRole |
|
The TaskRole of PAI |
- class luigi.contrib.pai.PaiJob(jobName, image, tasks)[source]
The Open PAI job definition. Refer to here https://github.com/Microsoft/pai/blob/master/docs/job_tutorial.md
{ "jobName": String, "image": String, "authFile": String, "dataDir": String, "outputDir": String, "codeDir": String, "virtualCluster": String, "taskRoles": [ { "name": String, "taskNumber": Integer, "cpuNumber": Integer, "memoryMB": Integer, "shmMB": Integer, "gpuNumber": Integer, "portList": [ { "label": String, "beginAt": Integer, "portNumber": Integer } ], "command": String, "minFailedTaskCount": Integer, "minSucceededTaskCount": Integer } ], "gpuType": String, "retryCount": Integer }
Initialize a Job with required fields.
- Parameters:
jobName – Name for the job, need to be unique
image – URL pointing to the Docker image for all tasks in the job
tasks – List of taskRole, one task role at least
- jobName
- image
- taskRoles
- authFile
- dataDir
- outputDir
- codeDir
- virtualCluster
- gpuType
- retryCount
- class luigi.contrib.pai.Port(label, begin_at=0, port_number=1)[source]
The Port definition for TaskRole
- Parameters:
label – Label name for the port type, required
begin_at – The port to begin with in the port type, 0 for random selection, required
port_number – Number of ports for the specific type, required
- label
- beginAt
- portNumber
- class luigi.contrib.pai.TaskRole(name, command, taskNumber=1, cpuNumber=1, memoryMB=2048, shmMB=64, gpuNumber=0, portList=[])[source]
The TaskRole of PAI
- Parameters:
name – Name for the task role, need to be unique with other roles, required
command – Executable command for tasks in the task role, can not be empty, required
taskNumber – Number of tasks for the task role, no less than 1, required
cpuNumber – CPU number for one task in the task role, no less than 1, required
shmMB – Shared memory for one task in the task role, no more than memory size, required
memoryMB – Memory for one task in the task role, no less than 100, required
gpuNumber – GPU number for one task in the task role, no less than 0, required
portList – List of portType to use, optional
- name
- command
- taskNumber
- cpuNumber
- memoryMB
- shmMB
- gpuNumber
- portList
- minFailedTaskCount
- minSucceededTaskCount
- class luigi.contrib.pai.OpenPai(*args, **kwargs)[source]
- pai_url
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See Parameters from config IngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- username
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See Parameters from config IngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- password
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See Parameters from config IngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- expiration
Parameter whose value is an
int.
- class luigi.contrib.pai.PaiTask(*args, **kwargs)[source]
- Parameters:
pai_url – The rest server url of PAI clusters, default is ‘http://127.0.0.1:9186’.
token – The token used to auth the rest server of PAI.
- abstract property name
Name for the job, need to be unique, required
- abstract property image
URL pointing to the Docker image for all tasks in the job, required
- abstract property tasks
List of taskRole, one task role at least, required
- property auth_file_path
Docker registry authentication file existing on HDFS, optional
- property data_dir
Data directory existing on HDFS, optional
- property code_dir
Code directory existing on HDFS, should not contain any data and should be less than 200MB, optional
- property output_dir
Output directory on HDFS, $PAI_DEFAULT_FS_URI/$jobName/output will be used if not specified, optional
- property virtual_cluster
The virtual cluster job runs on. If omitted, the job will run on default virtual cluster, optional
- property gpu_type
Specify the GPU type to be used in the tasks. If omitted, the job will run on any gpu type, optional
- property retry_count
Job retry count, no less than 0, optional
- output()[source]
The output that this Task produces.
The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single
Targetor a list ofTargetinstances.- Implementation note
If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.
See Task.output