airflow.providers.postgres.hooks.postgres

Attributes

USE_PSYCOPG3

sqlalchemy_version

CursorType

CursorRow

Classes

CompatConnection

Protocol for type hinting psycopg2 and psycopg3 connection objects.

PostgresHook

Interact with Postgres.

Module Contents

airflow.providers.postgres.hooks.postgres.USE_PSYCOPG3: bool[source]
airflow.providers.postgres.hooks.postgres.sqlalchemy_version[source]
type airflow.providers.postgres.hooks.postgres.CursorType = DictCursor | RealDictCursor | NamedTupleCursor[source]
type airflow.providers.postgres.hooks.postgres.CursorRow = dict[str, Any] | tuple[Any, ...][source]
class airflow.providers.postgres.hooks.postgres.CompatConnection[source]

Bases: Protocol

Protocol for type hinting psycopg2 and psycopg3 connection objects.

cursor(*args, **kwargs)[source]
commit()[source]
close()[source]
__enter__()[source]
__exit__(exc_type, exc_val, exc_tb)[source]
property notices: list[Any][source]
property adapters: Any[source]
property row_factory: Any[source]
add_notice_handler(handler)[source]
class airflow.providers.postgres.hooks.postgres.PostgresHook(*args, options=None, enable_log_db_messages=False, **kwargs)[source]

Bases: airflow.providers.common.sql.hooks.sql.DbApiHook

Interact with Postgres.

You can specify ssl parameters in the extra field of your connection as {"sslmode": "require", "sslcert": "/path/to/cert.pem", etc}. Also, you can choose cursor as {"cursor": "dictcursor"}. Refer to the psycopg2.extras or psycopg.rows for more details.

Note: For Redshift, use keepalives_idle in the extra connection parameters and set it to less than 300 seconds.

Note: For AWS IAM authentication, use iam in the extra connection parameters and set it to true. Leave the password field empty. This will use the “aws_default” connection to get the temporary token unless you override in extras. extras example: {"iam":true, "aws_conn_id":"my_aws_conn"}

For Redshift, also use redshift in the extra connection parameters and set it to true. The cluster-identifier is extracted from the beginning of the host field, so is optional. It can however be overridden in the extra field. extras example: {"iam":true, "redshift":true, "cluster-identifier": "my_cluster_id"}

For Redshift Serverless, use redshift-serverless in the extra connection parameters and set it to true. The workgroup-name is extracted from the beginning of the host field, so is optional. It can however be overridden in the extra field. extras example: {"iam":true, "redshift-serverless":true, "workgroup-name": "my_serverless_workgroup"}

Parameters:
  • postgres_conn_id – The postgres conn id reference to a specific postgres database.

  • options (str | None) – Optional. Specifies command-line options to send to the server at connection start. For example, setting this to -c search_path=myschema sets the session’s value of the search_path to myschema.

  • enable_log_db_messages (bool) – Optional. If enabled logs database messages sent to the client during the session. To avoid a memory leak psycopg2 only saves the last 50 messages. For details, see: PostgreSQL logging configuration parameters

conn_name_attr = 'postgres_conn_id'[source]
default_conn_name = 'postgres_default'[source]
default_client_log_level = 'warning'[source]
default_connector_version: int = 2[source]
conn_type = 'postgres'[source]
hook_name = 'Postgres'[source]
supports_autocommit = True[source]
supports_executemany = True[source]
ignored_extra_options[source]
default_azure_oauth_scope = 'https://ossrdbms-aad.database.windows.net/.default'[source]
conn: CompatConnection | None = None[source]
database: str | None[source]
options = None[source]
enable_log_db_messages = False[source]
property sqlalchemy_url: sqlalchemy.engine.URL[source]

Return a Sqlalchemy.engine.URL object from the connection.

Needs to be implemented in the provider subclass to return the sqlalchemy.engine.URL object.

Returns:

the extracted sqlalchemy.engine.URL object.

Return type:

sqlalchemy.engine.URL

property dialect_name: str[source]
property dialect: airflow.providers.common.sql.dialects.dialect.Dialect[source]
get_conn()[source]

Establish a connection to a postgres database.

get_df(sql: str | list[str], parameters: list | tuple | collections.abc.Mapping[str, Any] | None = None, *, df_type: Literal['pandas'] = 'pandas', **kwargs: Any) pandas.DataFrame[source]
get_df(sql: str | list[str], parameters: list | tuple | collections.abc.Mapping[str, Any] | None = None, *, df_type: Literal['polars'] = ..., **kwargs: Any) polars.DataFrame

Execute the sql and returns a dataframe.

Parameters:
  • sql – the sql statement to be executed (str) or a list of sql statements to execute

  • parameters – The parameters to render the SQL query with.

  • df_type – Type of dataframe to return, either “pandas” or “polars”

  • kwargs – (optional) passed into pandas.io.sql.read_sql or polars.read_database method

Returns:

A pandas or polars DataFrame containing the query results.

copy_expert(sql, filename)[source]

Execute SQL using psycopg’s copy_expert method.

Necessary to execute COPY command without access to a superuser.

Note: if this method is called with a “COPY FROM” statement and the specified input file does not exist, it creates an empty file and no data is loaded, but the operation succeeds. So if users want to be aware when the input file does not exist, they have to check its existence by themselves.

get_uri()[source]

Extract the URI from the connection.

Returns:

the extracted URI in Sqlalchemy URI format.

Return type:

str

bulk_load(table, tmp_file)[source]

Load a tab-delimited file into a database table.

bulk_dump(table, tmp_file)[source]

Dump a database table into a tab-delimited file.

get_iam_token(conn)[source]

Get the IAM token from different identity providers.

get_aws_iam_token(conn)[source]

Get the AWS IAM token.

This uses AWSHook to retrieve a temporary password to connect to Postgres or Redshift. Port is required. If none is provided, the default 5432 is used.

get_azure_iam_token(conn)[source]

Get the Azure IAM token.

This uses AzureBaseHook to retrieve an OAUTH token to connect to Postgres. Scope for the OAuth token can be set in the config option azure_oauth_scope under the section [postgres].

get_table_primary_key(table, schema='public')[source]

Get the table’s primary key.

Parameters:
  • table (str) – Name of the target table

  • schema (str | None) – Name of the target schema, public by default

Returns:

Primary key columns list

Return type:

list[str] | None

get_openlineage_database_info(connection)[source]

Return Postgres/Redshift specific information for OpenLineage.

get_openlineage_database_dialect(connection)[source]

Return postgres/redshift dialect.

get_openlineage_default_schema()[source]

Return current schema. This is usually changed with SEARCH_PATH parameter.

classmethod get_ui_field_behaviour()[source]
get_db_log_messages(conn)[source]

Log database messages.

insert_rows(table, rows, target_fields=None, commit_every=1000, replace=False, *, executemany=False, fast_executemany=False, autocommit=False, **kwargs)[source]

Insert a collection of tuples into a table.

Rows are inserted in chunks, each chunk (of size commit_every) is done in a new transaction.

Parameters:
  • table – Name of the target table

  • rows – The rows to insert into the table

  • target_fields – The names of the columns to fill in the table

  • commit_every – The maximum number of rows to insert in one transaction. Set to 0 to insert all rows in one transaction.

  • replace – Whether to replace instead of insert

  • executemany – If True, all rows are inserted at once in chunks defined by the commit_every parameter. This only works if all rows have same number of column names, but leads to better performance.

  • fast_executemany – If True, rows will be inserted using an optimized bulk execution strategy (psycopg2.extras.execute_batch). This can significantly improve performance for large inserts. If set to False, the method falls back to the default implementation from DbApiHook.insert_rows.

  • autocommit – What to set the connection’s autocommit setting to before executing the query.

Was this entry helpful?