luigi.contrib.hadoop module¶
Run Hadoop Mapreduce jobs using Hadoop Streaming. To run a job, you need
to subclass luigi.contrib.hadoop.JobTask
and implement a
mapper
and reducer
methods. See Example – Top Artists for
an example of how to run a Hadoop job.
- class luigi.contrib.hadoop.hadoop(*args, **kwargs)[source]¶
Bases:
Config
- pool = OptionalParameter (defaults to None): Hadoop pool so use for Hadoop tasks. To specify pools per tasks, see BaseHadoopJobTask.pool¶
- luigi.contrib.hadoop.attach(*packages)[source]¶
Attach a python package to hadoop map reduce tarballs to make those packages available on the hadoop cluster.
- luigi.contrib.hadoop.create_packages_archive(packages, filename)[source]¶
Create a tar archive which will contain the files for the packages listed in packages.
- luigi.contrib.hadoop.flatten(sequence)[source]¶
A simple generator which flattens a sequence.
Only one level is flattened.
(1, (2, 3), 4) -> (1, 2, 3, 4)
- exception luigi.contrib.hadoop.HadoopJobError(message, out=None, err=None)[source]¶
Bases:
RuntimeError
- luigi.contrib.hadoop.run_and_track_hadoop_job(arglist, tracking_url_callback=None, env=None)[source]¶
Runs the job by invoking the command from the given arglist. Finds tracking urls from the output and attempts to fetch errors using those urls if the job fails. Throws HadoopJobError with information about the error (including stdout and stderr from the process) on failure and returns normally otherwise.
- Parameters:
arglist –
tracking_url_callback –
env –
- Returns:
- luigi.contrib.hadoop.fetch_task_failures(tracking_url)[source]¶
Uses mechanize to fetch the actual task logs from the task tracker.
This is highly opportunistic, and we might not succeed. So we set a low timeout and hope it works. If it does not, it’s not the end of the world.
TODO: Yarn has a REST API that we should probably use instead: http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/WebServicesIntro.html
- class luigi.contrib.hadoop.HadoopJobRunner(streaming_jar, modules=None, streaming_args=None, libjars=None, libjars_in_hdfs=None, jobconfs=None, input_format=None, output_format=None, end_job_with_atomic_move_dir=True, archives=None)[source]¶
Bases:
JobRunner
Takes care of uploading & executing a Hadoop job using Hadoop streaming.
TODO: add code to support Elastic Mapreduce (using boto) and local execution.
- class luigi.contrib.hadoop.DefaultHadoopJobRunner[source]¶
Bases:
HadoopJobRunner
The default job runner just reads from config and sets stuff.
- class luigi.contrib.hadoop.LocalJobRunner(samplelines=None)[source]¶
Bases:
JobRunner
Will run the job locally.
This is useful for debugging and also unit testing. Tries to mimic Hadoop Streaming.
TODO: integrate with JobTask
- class luigi.contrib.hadoop.BaseHadoopJobTask(*args, **kwargs)[source]¶
Bases:
Task
- pool = Insignificant OptionalParameter (defaults to None)¶
- batch_counter_default = 1¶
- final_mapper = NotImplemented¶
- final_combiner = NotImplemented¶
- final_reducer = NotImplemented¶
- mr_priority = NotImplemented¶
- package_binary = None¶
- task_id = None¶
- init_local()[source]¶
Implement any work to setup any internal datastructure etc here.
You can add extra input using the requires_local/input_local methods.
Anything you set on the object will be pickled and available on the Hadoop nodes.
- data_interchange_format = 'python'¶
- requires_local()[source]¶
Default impl - override this method if you need any local input to be accessible in init().
- on_failure(exception)[source]¶
Override for custom error handling.
This method gets called if an exception is raised in
run()
. The returned value of this method is json encoded and sent to the scheduler as the expl argument. Its string representation will be used as the body of the error email sent out if any.Default behavior is to return a string representation of the stack trace.
- class luigi.contrib.hadoop.JobTask(*args, **kwargs)[source]¶
Bases:
BaseHadoopJobTask
- jobconf_truncate = 20000¶
- n_reduce_tasks = 25¶
- reducer = NotImplemented¶
- job_runner()[source]¶
Get the MapReduce runner for this job.
If all outputs are HdfsTargets, the DefaultHadoopJobRunner will be used. Otherwise, the LocalJobRunner which streams all data through the local machine will be used (great for testing).
- reader(input_stream)[source]¶
Reader is a method which iterates over input lines and outputs records.
The default implementation yields one argument containing the line for each line in the input.
- writer(outputs, stdout, stderr=<_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>)[source]¶
Writer format is a method which iterates over the output records from the reducer and formats them for output.
The default implementation outputs tab separated items.
- mapper(item)[source]¶
Re-define to process an input item (usually a line of input data).
Defaults to identity mapper that sends all lines to the same reducer.
- combiner = NotImplemented¶
- incr_counter(*args, **kwargs)[source]¶
Increments a Hadoop counter.
Since counters can be a bit slow to update, this batches the updates.
- extra_files()[source]¶
Can be overridden in subclass.
Each element is either a string, or a pair of two strings (src, dst).
src can be a directory (in which case everything will be copied recursively).
dst can include subdirectories (foo/bar/baz.txt etc)
Uses Hadoop’s -files option so that the same file is reused across tasks.
- extra_streaming_arguments()[source]¶
Extra arguments to Hadoop command line. Return here a list of (parameter, value) tuples.
- run_mapper(stdin=<_io.TextIOWrapper name='<stdin>' mode='r' encoding='utf-8'>, stdout=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>)[source]¶
Run the mapper on the hadoop node.
- run_reducer(stdin=<_io.TextIOWrapper name='<stdin>' mode='r' encoding='utf-8'>, stdout=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>)[source]¶
Run the reducer on the hadoop node.
- run_combiner(stdin=<_io.TextIOWrapper name='<stdin>' mode='r' encoding='utf-8'>, stdout=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>)[source]¶