# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""OpenerTarget support, allows easier testing and configuration by abstracting
out the LocalTarget, S3Target, and MockTarget types.
Example:
.. code-block:: python
from luigi.contrib.opener import OpenerTarget
OpenerTarget('/local/path.txt')
OpenerTarget('s3://zefr/remote/path.txt')
"""
from __future__ import annotations
import json
from urllib.parse import parse_qs, urlsplit
from luigi.contrib.s3 import S3Target
from luigi.local_target import LocalTarget
from luigi.mock import MockTarget
from luigi.target import FileSystemException
__all__ = ["OpenerError", "NoOpenerError", "InvalidQuery", "OpenerRegistry", "Opener", "MockOpener", "LocalOpener", "S3Opener", "opener", "OpenerTarget"]
[docs]
class OpenerError(FileSystemException):
"""The base exception thrown by openers"""
pass
[docs]
class NoOpenerError(OpenerError):
"""Thrown when there is no opener for the given protocol"""
pass
[docs]
class InvalidQuery(OpenerError):
"""Thrown when an opener is passed unexpected arguments"""
pass
[docs]
class OpenerRegistry:
def __init__(self, openers=None):
"""An opener registry that stores a number of opener objects used
to parse Target URIs
:param openers: A list of objects inherited from the Opener class.
:type openers: list
"""
if openers is None:
openers = []
self.registry = {}
self.openers = {}
self.default_opener = "file"
for opener in openers:
self.add(opener)
[docs]
def get_opener(self, name):
"""Retrieve an opener for the given protocol
:param name: name of the opener to open
:type name: string
:raises NoOpenerError: if no opener has been registered of that name
"""
if name not in self.registry:
raise NoOpenerError("No opener for %s" % name)
index = self.registry[name]
return self.openers[index]
[docs]
def add(self, opener):
"""Adds an opener to the registry
:param opener: Opener object
:type opener: Opener inherited object
"""
index = len(self.openers)
self.openers[index] = opener
for name in opener.names:
self.registry[name] = index
[docs]
def open(self, target_uri, **kwargs):
"""Open target uri.
:param target_uri: Uri to open
:type target_uri: string
:returns: Target object
"""
target = urlsplit(target_uri, scheme=self.default_opener)
opener = self.get_opener(target.scheme)
query = opener.conform_query(target.query)
target = opener.get_target(target.scheme, target.path, target.fragment, target.username, target.password, target.hostname, target.port, query, **kwargs)
target.opener_path = target_uri
return target
[docs]
class Opener:
"""Base class for Opener objects."""
# Dictionary of expected kwargs and flag for json loading values (bool/int)
allowed_kwargs: dict[str, bool] = {}
# Flag to filter out unexpected kwargs
filter_kwargs = True
[docs]
@classmethod
def get_target(cls, scheme, path, fragment, username, password, hostname, port, query, **kwargs):
"""Override this method to use values from the parsed uri to initialize
the expected target.
"""
raise NotImplementedError("get_target must be overridden")
[docs]
class MockOpener(Opener):
"""Mock target opener, works like LocalTarget but files are all in
memory.
example:
* mock://foo/bar.txt
"""
names = ["mock"]
allowed_kwargs = {
"is_tmp": True,
"mirror_on_stderr": True,
"format": False,
}
[docs]
@classmethod
def get_target(cls, scheme, path, fragment, username, password, hostname, port, query, **kwargs):
full_path = (hostname or "") + path
query.update(kwargs)
return MockTarget(full_path, **query)
[docs]
class LocalOpener(Opener):
"""Local filesystem opener, works with any valid system path. This
is the default opener and will be used if you don't indicate which opener.
examples:
* file://relative/foo/bar/baz.txt (opens a relative file)
* file:///home/user (opens a directory from a absolute path)
* foo/bar.baz (file:// is the default opener)
"""
names = ["file"]
allowed_kwargs = {
"is_tmp": True,
"format": False,
}
[docs]
@classmethod
def get_target(cls, scheme, path, fragment, username, password, hostname, port, query, **kwargs):
full_path = (hostname or "") + path
query.update(kwargs)
return LocalTarget(full_path, **query)
[docs]
class S3Opener(Opener):
"""Opens a target stored on Amazon S3 storage
examples:
* s3://bucket/foo/bar.txt
* s3://bucket/foo/bar.txt?aws_access_key_id=xxx&aws_secret_access_key=yyy
"""
names = ["s3", "s3n"]
allowed_kwargs = {
"format": False,
"client": True,
}
filter_kwargs = False
[docs]
@classmethod
def get_target(cls, scheme, path, fragment, username, password, hostname, port, query, **kwargs):
query.update(kwargs)
return S3Target("{scheme}://{hostname}{path}".format(scheme=scheme, hostname=hostname, path=path), **query)
opener = OpenerRegistry(
[
MockOpener,
LocalOpener,
S3Opener,
]
)
OpenerTarget = opener.open