luigi.contrib.kubernetes

Kubernetes Job wrapper for Luigi.

From the Kubernetes website:

Kubernetes is an open-source system for automating deployment, scaling, and management of containerized applications.

For more information about Kubernetes Jobs: http://kubernetes.io/docs/user-guide/jobs/

Requires:

  • pykube: pip install pykube-ng

Written and maintained by Marco Capuccini (@mcapuccini).

Classes

KubernetesJobTask(*args, **kwargs)

kubernetes(*args, **kwargs)

class luigi.contrib.kubernetes.kubernetes(*args, **kwargs)[source]
auth_method

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See Parameters from config Ingestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

kubeconfig_path

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See Parameters from config Ingestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

max_retrials

Parameter whose value is an int.

kubernetes_namespace

Class to parse optional parameters.

class luigi.contrib.kubernetes.KubernetesJobTask(*args, **kwargs)[source]
property auth_method

This can be set to kubeconfig or service-account. It defaults to kubeconfig.

For more details, please refer to:

property kubeconfig_path

Path to kubeconfig file used for cluster authentication. It defaults to “~/.kube/config”, which is the default location when using minikube (http://kubernetes.io/docs/getting-started-guides/minikube). When auth_method is service-account this property is ignored.

WARNING: For Python versions < 3.5 kubeconfig must point to a Kubernetes API hostname, and NOT to an IP address.

For more details, please refer to: http://kubernetes.io/docs/user-guide/kubeconfig-file

property kubernetes_namespace

Namespace in Kubernetes where the job will run. It defaults to the default namespace in Kubernetes

For more details, please refer to: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/

property name

A name for this job. This task will automatically append a UUID to the name before to submit to Kubernetes.

property labels

Return custom labels for kubernetes job. example:: {"run_dt": datetime.date.today().strftime('%F')}

property spec_schema

Kubernetes Job spec schema in JSON format, an example follows.

{
    "containers": [{
        "name": "pi",
        "image": "perl",
        "command": ["perl",  "-Mbignum=bpi", "-wle", "print bpi(2000)"]
    }],
    "restartPolicy": "Never"
}

restartPolicy

  • If restartPolicy is not defined, it will be set to “Never” by default.

  • Warning: restartPolicy=OnFailure will bypass max_retrials, and restart the container until success, with the risk of blocking the Luigi task.

For more informations please refer to: http://kubernetes.io/docs/user-guide/pods/multi-container/#the-spec-schema

property max_retrials

Maximum number of retrials in case of failure.

property backoff_limit

Maximum number of retries before considering the job as failed. See: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/#pod-backoff-failure-policy

property delete_on_success

Delete the Kubernetes workload if the job has ended successfully.

property print_pod_logs_on_exit

Fetch and print the pod logs once the job is completed.

property active_deadline_seconds

Time allowed to successfully schedule pods. See: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/#job-termination-and-cleanup

property kubernetes_config
property poll_interval

How often to poll Kubernetes for job status, in seconds.

property pod_creation_wait_interal

Delay for initial pod creation for just submitted job in seconds

signal_complete()[source]

Signal job completion for scheduler and dependent tasks.

Touching a system file is an easy way to signal completion. example:: .. code-block:: python

with self.output().open(‘w’) as output_file:

output_file.write(‘’)

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

output()[source]

An output target is necessary for checking job completion unless an alternative complete method is defined.

Example:

return luigi.LocalTarget(os.path.join('/tmp', 'example'))