luigi.contrib.postgres module

Implements a subclass of Target that writes data to Postgres. Also provides a helper task to copy data into a Postgres table.

luigi.contrib.postgres.db_error_code(exception)[source]
class luigi.contrib.postgres.MultiReplacer(replace_pairs)[source]

Bases: object

Object for one-pass replace of multiple words

Substituted parts will not be matched against other replace patterns, as opposed to when using multipass replace. The order of the items in the replace_pairs input will dictate replacement precedence.

Constructor arguments: replace_pairs – list of 2-tuples which hold strings to be replaced and replace string

Usage:

>>> replace_pairs = [("a", "b"), ("b", "c")]
>>> MultiReplacer(replace_pairs)("abcd")
'bccd'
>>> replace_pairs = [("ab", "x"), ("a", "x")]
>>> MultiReplacer(replace_pairs)("ab")
'x'
>>> replace_pairs.reverse()
>>> MultiReplacer(replace_pairs)("ab")
'xb'

Initializes a MultiReplacer instance.

Parameters:

replace_pairs (tuple) – list of 2-tuples which hold strings to be replaced and replace string.

class luigi.contrib.postgres.PostgresTarget(host, database, user, password, table, update_id, port=None)[source]

Bases: Target

Target for a resource in Postgres.

This will rarely have to be directly instantiated by the user.

Args:

host (str): Postgres server address. Possibly a host:port string. database (str): Database name user (str): Database user password (str): Password for specified user update_id (str): An identifier for this data set port (int): Postgres server port.

marker_table = 'table_updates'
DEFAULT_DB_PORT = 5432
use_db_timestamps = True
touch(connection=None)[source]

Mark this update as complete.

Important: If the marker table doesn’t exist, the connection transaction will be aborted and the connection reset. Then the marker table will be created.

exists(connection=None)[source]

Returns True if the Target exists and False otherwise.

connect()[source]

Get a DBAPI 2.0 connection object to the database where the table is.

create_marker_table()[source]

Create marker table if it doesn’t exist.

Using a separate connection since the transaction might have to be reset.

open(mode)[source]
class luigi.contrib.postgres.CopyToTable(*args, **kwargs)[source]

Bases: CopyToTable

Template task for inserting a data set into Postgres

Usage: Subclass and override the required host, database, user, password, table and columns attributes.

To customize how to access data from an input task, override the rows method with a generator that yields each row as a tuple with fields ordered according to columns.

rows()[source]

Return/yield tuples or lists corresponding to each row to be inserted.

map_column(value)[source]

Applied to each column of every row returned by rows.

Default behaviour is to escape special characters and identify any self.null_values.

output()[source]

Returns a PostgresTarget representing the inserted dataset.

Normally you don’t override this.

copy(cursor, file)[source]
run()[source]

Inserts data generated by rows() into target table.

If the target table doesn’t exist, self.create_table will be called to attempt to create the table.

Normally you don’t want to override this.

class luigi.contrib.postgres.PostgresQuery(*args, **kwargs)[source]

Bases: Query

Template task for querying a Postgres compatible database

Usage: Subclass and override the required host, database, user, password, table, and query attributes. Optionally one can override the autocommit attribute to put the connection for the query in autocommit mode.

Override the run method if your use case requires some action with the query result.

Task instances require a dynamic update_id, e.g. via parameter(s), otherwise the query will only execute once

To customize the query signature as recorded in the database marker table, override the update_id property.

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

output()[source]

Returns a PostgresTarget representing the executed query.

Normally you don’t override this.