Source code for luigi.contrib.webhdfs

# -*- coding: utf-8 -*-
# Copyright 2012-2015 Spotify AB
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
Provides a :class:`WebHdfsTarget` using the `Python hdfs

This module is DEPRECATED and does not play well with rest of luigi's hdfs
contrib module. You can consider migrating to

from __future__ import absolute_import

import logging

from luigi import six

from import FileSystemTarget, AtomicLocalFile
from luigi.format import get_default_format
import luigi.contrib.hdfs

logger = logging.getLogger("luigi-interface")

[docs]class WebHdfsTarget(FileSystemTarget): fs = None def __init__(self, path, client=None, format=None): super(WebHdfsTarget, self).__init__(path) path = self.path self.fs = client or WebHdfsClient() if format is None: format = get_default_format() self.format = format
[docs] def open(self, mode='r'): if mode not in ('r', 'w'): raise ValueError("Unsupported open mode '%s'" % mode) if mode == 'r': return self.format.pipe_reader( ReadableWebHdfsFile(path=self.path, client=self.fs) ) return self.format.pipe_writer( AtomicWebHdfsFile(path=self.path, client=self.fs) )
[docs]class ReadableWebHdfsFile(object): def __init__(self, path, client): self.path = path self.client = client self.generator = None
[docs] def read(self): self.generator = res = list(self.generator)[0] return res
[docs] def readlines(self, char='\n'): self.generator =, buffer_char=char) return self.generator
def __enter__(self): return self def __exit__(self, exc_type, exc, traceback): self.close() def __iter__(self): self.generator = self.readlines('\n') has_next = True while has_next: try: chunk = yield chunk except StopIteration: has_next = False self.close()
[docs] def close(self): self.generator.close()
[docs]class AtomicWebHdfsFile(AtomicLocalFile): """ An Hdfs file that writes to a temp file and put to WebHdfs on close. """ def __init__(self, path, client): self.client = client super(AtomicWebHdfsFile, self).__init__(path)
[docs] def move_to_final_destination(self): if not self.client.exists(self.path): self.client.upload(self.path, self.tmp_path)
WebHdfsClient = luigi.contrib.hdfs.WebHdfsClient