Source code for luigi.contrib.webhdfs
# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Provides a :class:`WebHdfsTarget` using the `Python hdfs
<https://pypi.python.org/pypi/hdfs/>`_
This module is DEPRECATED and does not play well with rest of luigi's hdfs
contrib module. You can consider migrating to
:class:`luigi.contrib.hdfs.webhdfs_client.WebHdfsClient`
"""
import logging
from luigi.target import FileSystemTarget, AtomicLocalFile
from luigi.format import get_default_format
import luigi.contrib.hdfs
logger = logging.getLogger("luigi-interface")
[docs]
class WebHdfsTarget(FileSystemTarget):
fs = None
def __init__(self, path, client=None, format=None):
super(WebHdfsTarget, self).__init__(path)
path = self.path
self.fs = client or WebHdfsClient()
if format is None:
format = get_default_format()
self.format = format
[docs]
def open(self, mode='r'):
if mode not in ('r', 'w'):
raise ValueError("Unsupported open mode '%s'" % mode)
if mode == 'r':
return self.format.pipe_reader(
ReadableWebHdfsFile(path=self.path, client=self.fs)
)
return self.format.pipe_writer(
AtomicWebHdfsFile(path=self.path, client=self.fs)
)
[docs]
class ReadableWebHdfsFile:
def __init__(self, path, client):
self.path = path
self.client = client
self.generator = None
[docs]
def read(self):
self.generator = self.client.read(self.path)
res = list(self.generator)[0]
return res
[docs]
def readlines(self, char='\n'):
self.generator = self.client.read(self.path, buffer_char=char)
return self.generator
def __enter__(self):
return self
def __exit__(self, exc_type, exc, traceback):
self.close()
def __iter__(self):
self.generator = self.readlines('\n')
yield from self.generator
self.close()
[docs]
def close(self):
self.generator.close()
[docs]
class AtomicWebHdfsFile(AtomicLocalFile):
"""
An Hdfs file that writes to a temp file and put to WebHdfs on close.
"""
def __init__(self, path, client):
self.client = client
super(AtomicWebHdfsFile, self).__init__(path)
[docs]
def move_to_final_destination(self):
if not self.client.exists(self.path):
self.client.upload(self.path, self.tmp_path)
WebHdfsClient = luigi.contrib.hdfs.WebHdfsClient