luigi.contrib.kubernetes module

Kubernetes Job wrapper for Luigi.

From the Kubernetes website:

Kubernetes is an open-source system for automating deployment, scaling, and management of containerized applications.

For more information about Kubernetes Jobs: http://kubernetes.io/docs/user-guide/jobs/

Requires:

  • pykube: pip install pykube-ng

Written and maintained by Marco Capuccini (@mcapuccini).

class luigi.contrib.kubernetes.kubernetes(*args, **kwargs)[source]

Bases: Config

auth_method = Parameter (defaults to kubeconfig): Authorization method to access the cluster
kubeconfig_path = Parameter (defaults to ~/.kube/config): Path to kubeconfig file for cluster authentication
max_retrials = IntParameter (defaults to 0): Max retrials in event of job failure
kubernetes_namespace = OptionalParameter (defaults to None): K8s namespace in which the job will run
class luigi.contrib.kubernetes.KubernetesJobTask(*args, **kwargs)[source]

Bases: Task

property auth_method

This can be set to kubeconfig or service-account. It defaults to kubeconfig.

For more details, please refer to:

property kubeconfig_path

Path to kubeconfig file used for cluster authentication. It defaults to “~/.kube/config”, which is the default location when using minikube (http://kubernetes.io/docs/getting-started-guides/minikube). When auth_method is service-account this property is ignored.

WARNING: For Python versions < 3.5 kubeconfig must point to a Kubernetes API hostname, and NOT to an IP address.

For more details, please refer to: http://kubernetes.io/docs/user-guide/kubeconfig-file

property kubernetes_namespace

Namespace in Kubernetes where the job will run. It defaults to the default namespace in Kubernetes

For more details, please refer to: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/

property name

A name for this job. This task will automatically append a UUID to the name before to submit to Kubernetes.

property labels

Return custom labels for kubernetes job. example:: {"run_dt": datetime.date.today().strftime('%F')}

property spec_schema

Kubernetes Job spec schema in JSON format, an example follows.

{
    "containers": [{
        "name": "pi",
        "image": "perl",
        "command": ["perl",  "-Mbignum=bpi", "-wle", "print bpi(2000)"]
    }],
    "restartPolicy": "Never"
}

restartPolicy

  • If restartPolicy is not defined, it will be set to “Never” by default.

  • Warning: restartPolicy=OnFailure will bypass max_retrials, and restart the container until success, with the risk of blocking the Luigi task.

For more informations please refer to: http://kubernetes.io/docs/user-guide/pods/multi-container/#the-spec-schema

property max_retrials

Maximum number of retrials in case of failure.

property backoff_limit

Maximum number of retries before considering the job as failed. See: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/#pod-backoff-failure-policy

property delete_on_success

Delete the Kubernetes workload if the job has ended successfully.

property print_pod_logs_on_exit

Fetch and print the pod logs once the job is completed.

property active_deadline_seconds

Time allowed to successfully schedule pods. See: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/#job-termination-and-cleanup

property kubernetes_config
property poll_interval

How often to poll Kubernetes for job status, in seconds.

property pod_creation_wait_interal

Delay for initial pod creation for just submitted job in seconds

signal_complete()[source]

Signal job completion for scheduler and dependent tasks.

Touching a system file is an easy way to signal completion. example:: .. code-block:: python

with self.output().open(‘w’) as output_file:

output_file.write(‘’)

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

output()[source]

An output target is necessary for checking job completion unless an alternative complete method is defined.

Example:

return luigi.LocalTarget(os.path.join('/tmp', 'example'))