luigi.contrib.hadoop module

Run Hadoop Mapreduce jobs using Hadoop Streaming. To run a job, you need to subclass luigi.contrib.hadoop.JobTask and implement a mapper and reducer methods. See Example – Top Artists for an example of how to run a Hadoop job.

class luigi.contrib.hadoop.hadoop(*args, **kwargs)[source]

Bases: Config

pool = OptionalParameter (defaults to None): Hadoop pool so use for Hadoop tasks. To specify pools per tasks, see BaseHadoopJobTask.pool
luigi.contrib.hadoop.attach(*packages)[source]

Attach a python package to hadoop map reduce tarballs to make those packages available on the hadoop cluster.

luigi.contrib.hadoop.dereference(f)[source]
luigi.contrib.hadoop.get_extra_files(extra_files)[source]
luigi.contrib.hadoop.create_packages_archive(packages, filename)[source]

Create a tar archive which will contain the files for the packages listed in packages.

luigi.contrib.hadoop.flatten(sequence)[source]

A simple generator which flattens a sequence.

Only one level is flattened.

(1, (2, 3), 4) -> (1, 2, 3, 4)
class luigi.contrib.hadoop.HadoopRunContext[source]

Bases: object

kill_job(captured_signal=None, stack_frame=None)[source]
exception luigi.contrib.hadoop.HadoopJobError(message, out=None, err=None)[source]

Bases: RuntimeError

luigi.contrib.hadoop.run_and_track_hadoop_job(arglist, tracking_url_callback=None, env=None)[source]

Runs the job by invoking the command from the given arglist. Finds tracking urls from the output and attempts to fetch errors using those urls if the job fails. Throws HadoopJobError with information about the error (including stdout and stderr from the process) on failure and returns normally otherwise.

Parameters:
  • arglist

  • tracking_url_callback

  • env

Returns:

luigi.contrib.hadoop.fetch_task_failures(tracking_url)[source]

Uses mechanize to fetch the actual task logs from the task tracker.

This is highly opportunistic, and we might not succeed. So we set a low timeout and hope it works. If it does not, it’s not the end of the world.

TODO: Yarn has a REST API that we should probably use instead: http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/WebServicesIntro.html

class luigi.contrib.hadoop.JobRunner[source]

Bases: object

run_job = NotImplemented
class luigi.contrib.hadoop.HadoopJobRunner(streaming_jar, modules=None, streaming_args=None, libjars=None, libjars_in_hdfs=None, jobconfs=None, input_format=None, output_format=None, end_job_with_atomic_move_dir=True, archives=None)[source]

Bases: JobRunner

Takes care of uploading & executing a Hadoop job using Hadoop streaming.

TODO: add code to support Elastic Mapreduce (using boto) and local execution.

run_job(job, tracking_url_callback=None)[source]
finish()[source]
class luigi.contrib.hadoop.DefaultHadoopJobRunner[source]

Bases: HadoopJobRunner

The default job runner just reads from config and sets stuff.

class luigi.contrib.hadoop.LocalJobRunner(samplelines=None)[source]

Bases: JobRunner

Will run the job locally.

This is useful for debugging and also unit testing. Tries to mimic Hadoop Streaming.

TODO: integrate with JobTask

sample(input_stream, n, output)[source]
group(input_stream)[source]
run_job(job)[source]
class luigi.contrib.hadoop.BaseHadoopJobTask(*args, **kwargs)[source]

Bases: Task

pool = Insignificant OptionalParameter (defaults to None)
batch_counter_default = 1
final_mapper = NotImplemented
final_combiner = NotImplemented
final_reducer = NotImplemented
mr_priority = NotImplemented
package_binary = None
task_id = None
abstract job_runner()[source]
jobconfs()[source]
init_local()[source]

Implement any work to setup any internal datastructure etc here.

You can add extra input using the requires_local/input_local methods.

Anything you set on the object will be pickled and available on the Hadoop nodes.

init_hadoop()[source]
data_interchange_format = 'python'
run()[source]

The task run method, to be overridden in a subclass.

See Task.run

requires_local()[source]

Default impl - override this method if you need any local input to be accessible in init().

requires_hadoop()[source]
input_local()[source]
input_hadoop()[source]
deps()[source]

Internal method used by the scheduler.

Returns the flattened list of requires.

on_failure(exception)[source]

Override for custom error handling.

This method gets called if an exception is raised in run(). The returned value of this method is json encoded and sent to the scheduler as the expl argument. Its string representation will be used as the body of the error email sent out if any.

Default behavior is to return a string representation of the stack trace.

class luigi.contrib.hadoop.JobTask(*args, **kwargs)[source]

Bases: BaseHadoopJobTask

jobconf_truncate = 20000
n_reduce_tasks = 25
reducer = NotImplemented
jobconfs()[source]
init_mapper()[source]
init_combiner()[source]
init_reducer()[source]
job_runner()[source]

Get the MapReduce runner for this job.

If all outputs are HdfsTargets, the DefaultHadoopJobRunner will be used. Otherwise, the LocalJobRunner which streams all data through the local machine will be used (great for testing).

reader(input_stream)[source]

Reader is a method which iterates over input lines and outputs records.

The default implementation yields one argument containing the line for each line in the input.

writer(outputs, stdout, stderr=<_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>)[source]

Writer format is a method which iterates over the output records from the reducer and formats them for output.

The default implementation outputs tab separated items.

mapper(item)[source]

Re-define to process an input item (usually a line of input data).

Defaults to identity mapper that sends all lines to the same reducer.

combiner = NotImplemented
incr_counter(*args, **kwargs)[source]

Increments a Hadoop counter.

Since counters can be a bit slow to update, this batches the updates.

extra_modules()[source]
extra_files()[source]

Can be overridden in subclass.

Each element is either a string, or a pair of two strings (src, dst).

  • src can be a directory (in which case everything will be copied recursively).

  • dst can include subdirectories (foo/bar/baz.txt etc)

Uses Hadoop’s -files option so that the same file is reused across tasks.

extra_streaming_arguments()[source]

Extra arguments to Hadoop command line. Return here a list of (parameter, value) tuples.

extra_archives()[source]

List of paths to archives

dump(directory='')[source]

Dump instance to file.

run_mapper(stdin=<_io.TextIOWrapper name='<stdin>' mode='r' encoding='utf-8'>, stdout=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>)[source]

Run the mapper on the hadoop node.

run_reducer(stdin=<_io.TextIOWrapper name='<stdin>' mode='r' encoding='utf-8'>, stdout=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>)[source]

Run the reducer on the hadoop node.

run_combiner(stdin=<_io.TextIOWrapper name='<stdin>' mode='r' encoding='utf-8'>, stdout=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>)[source]
internal_reader(input_stream)[source]

Reader which uses python eval on each part of a tab separated string. Yields a tuple of python objects.

internal_writer(outputs, stdout)[source]

Writer which outputs the python repr for each item.