luigi.contrib.hadoop module

Run Hadoop Mapreduce jobs using Hadoop Streaming. To run a job, you need to subclass luigi.contrib.hadoop.JobTask and implement a mapper and reducer methods. See Example – Top Artists for an example of how to run a Hadoop job.

class luigi.contrib.hadoop.BaseHadoopJobTask(*args, **kwargs)[source]

Bases: luigi.task.Task

batch_counter_default = 1
data_interchange_format = 'python'
deps()[source]
final_combiner = NotImplemented
final_mapper = NotImplemented
final_reducer = NotImplemented
init_hadoop()[source]
init_local()[source]

Implement any work to setup any internal datastructure etc here.

You can add extra input using the requires_local/input_local methods.

Anything you set on the object will be pickled and available on the Hadoop nodes.

input_hadoop()[source]
input_local()[source]
job_runner()[source]
jobconfs()[source]
mr_priority = NotImplemented
on_failure(exception)[source]
package_binary = None
pool = Insignificant Parameter (defaults to None)
requires_hadoop()[source]
requires_local()[source]

Default impl - override this method if you need any local input to be accessible in init().

run()[source]
task_id = None
class luigi.contrib.hadoop.DefaultHadoopJobRunner[source]

Bases: luigi.contrib.hadoop.HadoopJobRunner

The default job runner just reads from config and sets stuff.

exception luigi.contrib.hadoop.HadoopJobError(message, out=None, err=None)[source]

Bases: exceptions.RuntimeError

class luigi.contrib.hadoop.HadoopJobRunner(streaming_jar, modules=None, streaming_args=None, libjars=None, libjars_in_hdfs=None, jobconfs=None, input_format=None, output_format=None, end_job_with_atomic_move_dir=True, archives=None)[source]

Bases: luigi.contrib.hadoop.JobRunner

Takes care of uploading & executing a Hadoop job using Hadoop streaming.

TODO: add code to support Elastic Mapreduce (using boto) and local execution.

finish()[source]
run_job(job, tracking_url_callback=None)[source]
class luigi.contrib.hadoop.HadoopRunContext[source]

Bases: object

kill_job(captured_signal=None, stack_frame=None)[source]
class luigi.contrib.hadoop.JobRunner[source]

Bases: object

run_job = NotImplemented
class luigi.contrib.hadoop.JobTask(*args, **kwargs)[source]

Bases: luigi.contrib.hadoop.BaseHadoopJobTask

combiner = NotImplemented
dump(directory='')[source]

Dump instance to file.

extra_archives()[source]

List of paths to archives

extra_files()[source]

Can be overriden in subclass.

Each element is either a string, or a pair of two strings (src, dst).

  • src can be a directory (in which case everything will be copied recursively).
  • dst can include subdirectories (foo/bar/baz.txt etc)

Uses Hadoop’s -files option so that the same file is reused across tasks.

extra_modules()[source]
extra_streaming_arguments()[source]

Extra arguments to Hadoop command line. Return here a list of (parameter, value) tuples.

incr_counter(*args, **kwargs)[source]

Increments a Hadoop counter.

Since counters can be a bit slow to update, this batches the updates.

init_combiner()[source]
init_mapper()[source]
init_reducer()[source]
internal_reader(input_stream)[source]

Reader which uses python eval on each part of a tab separated string. Yields a tuple of python objects.

internal_writer(outputs, stdout)[source]

Writer which outputs the python repr for each item.

job_runner()[source]

Get the MapReduce runner for this job.

If all outputs are HdfsTargets, the DefaultHadoopJobRunner will be used. Otherwise, the LocalJobRunner which streams all data through the local machine will be used (great for testing).

jobconf_truncate = 20000
jobconfs()[source]
mapper(item)[source]

Re-define to process an input item (usually a line of input data).

Defaults to identity mapper that sends all lines to the same reducer.

n_reduce_tasks = 25
reader(input_stream)[source]

Reader is a method which iterates over input lines and outputs records.

The default implementation yields one argument containing the line for each line in the input.

reducer = NotImplemented
run_combiner(stdin=<open file '<stdin>', mode 'r'>, stdout=<open file '<stdout>', mode 'w'>)[source]
run_mapper(stdin=<open file '<stdin>', mode 'r'>, stdout=<open file '<stdout>', mode 'w'>)[source]

Run the mapper on the hadoop node.

run_reducer(stdin=<open file '<stdin>', mode 'r'>, stdout=<open file '<stdout>', mode 'w'>)[source]

Run the reducer on the hadoop node.

writer(outputs, stdout, stderr=<open file '<stderr>', mode 'w'>)[source]

Writer format is a method which iterates over the output records from the reducer and formats them for output.

The default implementation outputs tab separated items.

class luigi.contrib.hadoop.LocalJobRunner(samplelines=None)[source]

Bases: luigi.contrib.hadoop.JobRunner

Will run the job locally.

This is useful for debugging and also unit testing. Tries to mimic Hadoop Streaming.

TODO: integrate with JobTask

group(input_stream)[source]
run_job(job)[source]
sample(input_stream, n, output)[source]
luigi.contrib.hadoop.attach(*packages)[source]

Attach a python package to hadoop map reduce tarballs to make those packages available on the hadoop cluster.

luigi.contrib.hadoop.create_packages_archive(packages, filename)[source]

Create a tar archive which will contain the files for the packages listed in packages.

luigi.contrib.hadoop.dereference(f)[source]
luigi.contrib.hadoop.fetch_task_failures(tracking_url)[source]

Uses mechanize to fetch the actual task logs from the task tracker.

This is highly opportunistic, and we might not succeed. So we set a low timeout and hope it works. If it does not, it’s not the end of the world.

TODO: Yarn has a REST API that we should probably use instead: http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/WebServicesIntro.html

luigi.contrib.hadoop.flatten(sequence)[source]

A simple generator which flattens a sequence.

Only one level is flattened.

(1, (2, 3), 4) -> (1, 2, 3, 4)
luigi.contrib.hadoop.get_extra_files(extra_files)[source]
class luigi.contrib.hadoop.hadoop(*args, **kwargs)[source]

Bases: luigi.task.Config

pool = Parameter (defaults to None): Hadoop pool so use for Hadoop tasks. To specify pools per tasks, see BaseHadoopJobTask.pool
luigi.contrib.hadoop.run_and_track_hadoop_job(arglist, tracking_url_callback=None, env=None)[source]

Runs the job by invoking the command from the given arglist. Finds tracking urls from the output and attempts to fetch errors using those urls if the job fails. Throws HadoopJobError with information about the error (including stdout and stderr from the process) on failure and returns normally otherwise.

Parameters:
  • arglist
  • tracking_url_callback
  • env
Returns: