luigi.contrib.redshift module

class luigi.contrib.redshift.RedshiftTarget(host, database, user, password, table, update_id, port=None)[source]

Bases: luigi.contrib.postgres.PostgresTarget

Target for a resource in Redshift.

Redshift is similar to postgres with a few adjustments required by redshift.

Args:
host (str): Postgres server address. Possibly a host:port string. database (str): Database name user (str): Database user password (str): Password for specified user update_id (str): An identifier for this data set port (int): Postgres server port.
marker_table = 'table_updates'
DEFAULT_DB_PORT = 5439
use_db_timestamps = False
class luigi.contrib.redshift.S3CopyToTable(*args, **kwargs)[source]

Bases: luigi.contrib.rdbms.CopyToTable, luigi.contrib.redshift._CredentialsMixin

Template task for inserting a data set into Redshift from s3.

Usage:

  • Subclass and override the required attributes:
    • host,
    • database,
    • user,
    • password,
    • table,
    • columns,
    • s3_load_path.
  • You can also override the attributes provided by the CredentialsMixin if they are not supplied by your configuration or environment variables.
s3_load_path()[source]

Override to return the load path.

copy_options

Add extra copy options, for example:

  • TIMEFORMAT ‘auto’
  • IGNOREHEADER 1
  • TRUNCATECOLUMNS
  • IGNOREBLANKLINES
  • DELIMITER ‘ ‘
prune_table

Override to set equal to the name of the table which is to be pruned. Intended to be used in conjunction with prune_column and prune_date i.e. copy to temp table, prune production table to prune_column with a date greater than prune_date, then insert into production table from temp table

prune_column

Override to set equal to the column of the prune_table which is to be compared Intended to be used in conjunction with prune_table and prune_date i.e. copy to temp table, prune production table to prune_column with a date greater than prune_date, then insert into production table from temp table

prune_date

Override to set equal to the date by which prune_column is to be compared Intended to be used in conjunction with prune_table and prune_column i.e. copy to temp table, prune production table to prune_column with a date greater than prune_date, then insert into production table from temp table

table_attributes

Add extra table attributes, for example:

DISTSTYLE KEY DISTKEY (MY_FIELD) SORTKEY (MY_FIELD_2, MY_FIELD_3)

table_constraints

Add extra table constraints, for example:

PRIMARY KEY (MY_FIELD, MY_FIELD_2) UNIQUE KEY (MY_FIELD_3)

do_truncate_table

Return True if table should be truncated before copying new data in.

do_prune()[source]

Return True if prune_table, prune_column, and prune_date are implemented. If only a subset of prune variables are override, an exception is raised to remind the user to implement all or none. Prune (data newer than prune_date deleted) before copying new data in.

table_type

Return table type (i.e. ‘temp’).

queries

Override to return a list of queries to be executed in order.

truncate_table(connection)[source]
prune(connection)[source]
create_schema(connection)[source]

Will create the schema in the database

create_table(connection)[source]

Override to provide code for creating the target table.

By default it will be created using types (optionally) specified in columns.

If overridden, use the provided connection object for setting up the table in order to create the table and insert data using the same transaction.

run()[source]

If the target table doesn’t exist, self.create_table will be called to attempt to create the table.

copy(cursor, f)[source]

Defines copying from s3 into redshift.

If both key-based and role-based credentials are provided, role-based will be used.

output()[source]

Returns a RedshiftTarget representing the inserted dataset.

Normally you don’t override this.

does_schema_exist(connection)[source]

Determine whether the schema already exists.

does_table_exist(connection)[source]

Determine whether the table already exists.

init_copy(connection)[source]

Perform pre-copy sql - such as creating table, truncating, or removing data older than x.

post_copy(cursor)[source]

Performs post-copy sql - such as cleansing data, inserting into production table (if copied to temp table), etc.

post_copy_metacolums(cursor)[source]

Performs post-copy to fill metadata columns.

class luigi.contrib.redshift.S3CopyJSONToTable(*args, **kwargs)[source]

Bases: luigi.contrib.redshift.S3CopyToTable, luigi.contrib.redshift._CredentialsMixin

Template task for inserting a JSON data set into Redshift from s3.

Usage:

  • Subclass and override the required attributes:

    • host,
    • database,
    • user,
    • password,
    • table,
    • columns,
    • s3_load_path,
    • jsonpath,
    • copy_json_options.
  • You can also override the attributes provided by the CredentialsMixin if they are not supplied by your configuration or environment variables.
jsonpath

Override the jsonpath schema location for the table.

copy_json_options

Add extra copy options, for example:

  • GZIP
  • LZOP
copy(cursor, f)[source]

Defines copying JSON from s3 into redshift.

class luigi.contrib.redshift.RedshiftManifestTask(*args, **kwargs)[source]

Bases: luigi.contrib.s3.S3PathTask

Generic task to generate a manifest file that can be used in S3CopyToTable in order to copy multiple files from your s3 folder into a redshift table at once.

For full description on how to use the manifest file see http://docs.aws.amazon.com/redshift/latest/dg/loading-data-files-using-manifest.html

Usage:

  • requires parameters
    • path - s3 path to the generated manifest file, including the
      name of the generated file to be copied into a redshift table
    • folder_paths - s3 paths to the folders containing files you wish to be copied

Output:

  • generated manifest file
folder_paths = Parameter
text_target = True
run()[source]

The task run method, to be overridden in a subclass.

See Task.run

class luigi.contrib.redshift.KillOpenRedshiftSessions(*args, **kwargs)[source]

Bases: luigi.task.Task

An task for killing any open Redshift sessions in a given database. This is necessary to prevent open user sessions with transactions against the table from blocking drop or truncate table commands.

Usage:

Subclass and override the required host, database, user, and password attributes.

connection_reset_wait_seconds = IntParameter (defaults to 60)
host
database
user
password
update_id

This update id will be a unique identifier for this insert on this table.

output()[source]

Returns a RedshiftTarget representing the inserted dataset.

Normally you don’t override this.

run()[source]

Kill any open Redshift sessions for the given database.

class luigi.contrib.redshift.RedshiftQuery(*args, **kwargs)[source]

Bases: luigi.contrib.postgres.PostgresQuery

Template task for querying an Amazon Redshift database

Usage: Subclass and override the required host, database, user, password, table, and query attributes.

Override the run method if your use case requires some action with the query result.

Task instances require a dynamic update_id, e.g. via parameter(s), otherwise the query will only execute once

To customize the query signature as recorded in the database marker table, override the update_id property.

output()[source]

Returns a RedshiftTarget representing the executed query.

Normally you don’t override this.

class luigi.contrib.redshift.RedshiftUnloadTask(*args, **kwargs)[source]

Bases: luigi.contrib.postgres.PostgresQuery, luigi.contrib.redshift._CredentialsMixin

Template task for running UNLOAD on an Amazon Redshift database

Usage: Subclass and override the required host, database, user, password, table, and query attributes. Optionally, override the autocommit atribute to run the query in autocommit mode - this is necessary to run VACUUM for example. Override the run method if your use case requires some action with the query result. Task instances require a dynamic update_id, e.g. via parameter(s), otherwise the query will only execute once To customize the query signature as recorded in the database marker table, override the update_id property. You can also override the attributes provided by the CredentialsMixin if they are not supplied by your configuration or environment variables.

s3_unload_path

Override to return the load path.

unload_options

Add extra or override default unload options:

unload_query

Default UNLOAD command

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

output()[source]

Returns a RedshiftTarget representing the executed query.

Normally you don’t override this.