Source code for luigi.contrib.redis_store

# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import datetime
import logging

from luigi.target import Target
from luigi.parameter import Parameter

logger = logging.getLogger('luigi-interface')

try:
    import redis

except ImportError:
    logger.warning("Loading redis_store module without redis installed. "
                   "Will crash at runtime if redis_store functionality is used.")


[docs] class RedisTarget(Target): """ Target for a resource in Redis.""" marker_prefix = Parameter(default='luigi', config_path=dict(section='redis', name='marker-prefix')) def __init__(self, host, port, db, update_id, password=None, socket_timeout=None, expire=None): """ :param host: Redis server host :type host: str :param port: Redis server port :type port: int :param db: database index :type db: int :param update_id: an identifier for this data hash :type update_id: str :param password: a password to connect to the redis server :type password: str :param socket_timeout: client socket timeout :type socket_timeout: int :param expire: timeout before the target is deleted :type expire: int """ self.host = host self.port = port self.db = db self.password = password self.socket_timeout = socket_timeout self.update_id = update_id self.expire = expire self.redis_client = redis.StrictRedis( host=self.host, port=self.port, password=self.password, db=self.db, socket_timeout=self.socket_timeout, ) def __str__(self): return self.marker_key()
[docs] def marker_key(self): """ Generate a key for the indicator hash. """ return '%s:%s' % (self.marker_prefix, self.update_id)
[docs] def touch(self): """ Mark this update as complete. We index the parameters `update_id` and `date`. """ marker_key = self.marker_key() self.redis_client.hset(marker_key, 'update_id', self.update_id) self.redis_client.hset(marker_key, 'date', datetime.datetime.now().isoformat()) if self.expire is not None: self.redis_client.expire(marker_key, self.expire)
[docs] def exists(self): """ Test, if this task has been run. """ return self.redis_client.exists(self.marker_key()) == 1