Source code for luigi.contrib.hdfs.clients
# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
The implementations of the hdfs clients.
"""
import logging
import threading
from luigi.contrib.hdfs import config as hdfs_config
from luigi.contrib.hdfs import webhdfs_client as hdfs_webhdfs_client
from luigi.contrib.hdfs import hadoopcli_clients as hdfs_hadoopcli_clients
logger = logging.getLogger('luigi-interface')
_AUTOCONFIG_CLIENT = threading.local()
[docs]
def get_autoconfig_client(client_cache=_AUTOCONFIG_CLIENT):
"""
Creates the client as specified in the `luigi.cfg` configuration.
"""
try:
return client_cache.client
except AttributeError:
configured_client = hdfs_config.get_configured_hdfs_client()
if configured_client == "webhdfs":
client_cache.client = hdfs_webhdfs_client.WebHdfsClient()
elif configured_client == "hadoopcli":
client_cache.client = hdfs_hadoopcli_clients.create_hadoopcli_client()
else:
raise Exception("Unknown hdfs client " + configured_client)
return client_cache.client
def _with_ac(method_name):
def result(*args, **kwargs):
return getattr(get_autoconfig_client(), method_name)(*args, **kwargs)
return result
exists = _with_ac('exists')
rename = _with_ac('rename')
remove = _with_ac('remove')
mkdir = _with_ac('mkdir')
listdir = _with_ac('listdir')