luigi.contrib.kubernetes module

Kubernetes Job wrapper for Luigi.

From the Kubernetes website:

Kubernetes is an open-source system for automating deployment, scaling, and management of containerized applications.

For more information about Kubernetes Jobs: http://kubernetes.io/docs/user-guide/jobs/

Requires:

  • pykube: pip install pykube

Written and maintained by Marco Capuccini (@mcapuccini).

class luigi.contrib.kubernetes.kubernetes(*args, **kwargs)[source]

Bases: luigi.task.Config

auth_method = Parameter (defaults to kubeconfig): Authorization method to access the cluster
kubeconfig_path = Parameter (defaults to ~/.kube/config): Path to kubeconfig file for cluster authentication
max_retrials = IntParameter (defaults to 0): Max retrials in event of job failure
class luigi.contrib.kubernetes.KubernetesJobTask(*args, **kwargs)[source]

Bases: luigi.task.Task

auth_method

This can be set to kubeconfig or service-account. It defaults to kubeconfig.

For more details, please refer to:

kubeconfig_path

Path to kubeconfig file used for cluster authentication. It defaults to “~/.kube/config”, which is the default location when using minikube (http://kubernetes.io/docs/getting-started-guides/minikube). When auth_method is service-account this property is ignored.

WARNING: For Python versions < 3.5 kubeconfig must point to a Kubernetes API hostname, and NOT to an IP address.

For more details, please refer to: http://kubernetes.io/docs/user-guide/kubeconfig-file

name

A name for this job. This task will automatically append a UUID to the name before to submit to Kubernetes.

labels

Return custom labels for kubernetes job. example:: {"run_dt": datetime.date.today().strftime('%F')}

spec_schema

Kubernetes Job spec schema in JSON format, an example follows.

{
    "containers": [{
        "name": "pi",
        "image": "perl",
        "command": ["perl",  "-Mbignum=bpi", "-wle", "print bpi(2000)"]
    }],
    "restartPolicy": "Never"
}

restartPolicy

  • If restartPolicy is not defined, it will be set to “Never” by default.
  • Warning: restartPolicy=OnFailure will bypass max_retrials, and restart the container until success, with the risk of blocking the Luigi task.

For more informations please refer to: http://kubernetes.io/docs/user-guide/pods/multi-container/#the-spec-schema

max_retrials

Maximum number of retrials in case of failure.

backoff_limit

Maximum number of retries before considering the job as failed. See: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/#pod-backoff-failure-policy

delete_on_success

Delete the Kubernetes workload if the job has ended successfully.

print_pod_logs_on_exit

Fetch and print the pod logs once the job is completed.

active_deadline_seconds

Time allowed to successfully schedule pods. See: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/#job-termination-and-cleanup

kubernetes_config
signal_complete()[source]

Signal job completion for scheduler and dependent tasks.

Touching a system file is an easy way to signal completion. example:: .. code-block:: python

with self.output().open(‘w’) as output_file:
output_file.write(‘’)
run()[source]

The task run method, to be overridden in a subclass.

See Task.run

output()[source]

An output target is necessary for checking job completion unless an alternative complete method is defined.

Example:

return luigi.LocalTarget(os.path.join('/tmp', 'example'))