airflow.providers.postgres.hooks.postgres¶
Attributes¶
Classes¶
Protocol for type hinting psycopg2 and psycopg3 connection objects. |
|
Interact with Postgres. |
Module Contents¶
- type airflow.providers.postgres.hooks.postgres.CursorType = DictCursor | RealDictCursor | NamedTupleCursor[source]¶
- type airflow.providers.postgres.hooks.postgres.CursorRow = dict[str, Any] | tuple[Any, ...][source]¶
- class airflow.providers.postgres.hooks.postgres.CompatConnection[source]¶
Bases:
ProtocolProtocol for type hinting psycopg2 and psycopg3 connection objects.
- class airflow.providers.postgres.hooks.postgres.PostgresHook(*args, options=None, enable_log_db_messages=False, **kwargs)[source]¶
Bases:
airflow.providers.common.sql.hooks.sql.DbApiHookInteract with Postgres.
You can specify ssl parameters in the extra field of your connection as
{"sslmode": "require", "sslcert": "/path/to/cert.pem", etc}. Also, you can choose cursor as{"cursor": "dictcursor"}. Refer to the psycopg2.extras or psycopg.rows for more details.Note: For Redshift, use keepalives_idle in the extra connection parameters and set it to less than 300 seconds.
Note: For AWS IAM authentication, use iam in the extra connection parameters and set it to true. Leave the password field empty. This will use the “aws_default” connection to get the temporary token unless you override in extras. extras example:
{"iam":true, "aws_conn_id":"my_aws_conn"}For Redshift, also use redshift in the extra connection parameters and set it to true. The cluster-identifier is extracted from the beginning of the host field, so is optional. It can however be overridden in the extra field. extras example:
{"iam":true, "redshift":true, "cluster-identifier": "my_cluster_id"}For Redshift Serverless, use redshift-serverless in the extra connection parameters and set it to true. The workgroup-name is extracted from the beginning of the host field, so is optional. It can however be overridden in the extra field. extras example:
{"iam":true, "redshift-serverless":true, "workgroup-name": "my_serverless_workgroup"}- Parameters:
postgres_conn_id – The postgres conn id reference to a specific postgres database.
options (str | None) – Optional. Specifies command-line options to send to the server at connection start. For example, setting this to
-c search_path=myschemasets the session’s value of thesearch_pathtomyschema.enable_log_db_messages (bool) – Optional. If enabled logs database messages sent to the client during the session. To avoid a memory leak psycopg2 only saves the last 50 messages. For details, see: PostgreSQL logging configuration parameters
- conn: CompatConnection | None = None[source]¶
- property sqlalchemy_url: sqlalchemy.engine.URL[source]¶
Return a Sqlalchemy.engine.URL object from the connection.
Needs to be implemented in the provider subclass to return the sqlalchemy.engine.URL object.
- Returns:
the extracted sqlalchemy.engine.URL object.
- Return type:
- property dialect: airflow.providers.common.sql.dialects.dialect.Dialect[source]¶
- get_df(sql: str | list[str], parameters: list | tuple | collections.abc.Mapping[str, Any] | None = None, *, df_type: Literal['pandas'] = 'pandas', **kwargs: Any) pandas.DataFrame[source]¶
- get_df(sql: str | list[str], parameters: list | tuple | collections.abc.Mapping[str, Any] | None = None, *, df_type: Literal['polars'] = ..., **kwargs: Any) polars.DataFrame
Execute the sql and returns a dataframe.
- Parameters:
sql – the sql statement to be executed (str) or a list of sql statements to execute
parameters – The parameters to render the SQL query with.
df_type – Type of dataframe to return, either “pandas” or “polars”
kwargs – (optional) passed into pandas.io.sql.read_sql or polars.read_database method
- Returns:
A pandas or polars DataFrame containing the query results.
- copy_expert(sql, filename)[source]¶
Execute SQL using psycopg’s
copy_expertmethod.Necessary to execute COPY command without access to a superuser.
Note: if this method is called with a “COPY FROM” statement and the specified input file does not exist, it creates an empty file and no data is loaded, but the operation succeeds. So if users want to be aware when the input file does not exist, they have to check its existence by themselves.
- get_uri()[source]¶
Extract the URI from the connection.
- Returns:
the extracted URI in Sqlalchemy URI format.
- Return type:
- get_aws_iam_token(conn)[source]¶
Get the AWS IAM token.
This uses AWSHook to retrieve a temporary password to connect to Postgres or Redshift. Port is required. If none is provided, the default 5432 is used.
- get_azure_iam_token(conn)[source]¶
Get the Azure IAM token.
This uses AzureBaseHook to retrieve an OAUTH token to connect to Postgres. Scope for the OAuth token can be set in the config option
azure_oauth_scopeunder the section[postgres].
- get_openlineage_database_info(connection)[source]¶
Return Postgres/Redshift specific information for OpenLineage.
- get_openlineage_default_schema()[source]¶
Return current schema. This is usually changed with
SEARCH_PATHparameter.
- insert_rows(table, rows, target_fields=None, commit_every=1000, replace=False, *, executemany=False, fast_executemany=False, autocommit=False, **kwargs)[source]¶
Insert a collection of tuples into a table.
Rows are inserted in chunks, each chunk (of size
commit_every) is done in a new transaction.- Parameters:
table – Name of the target table
rows – The rows to insert into the table
target_fields – The names of the columns to fill in the table
commit_every – The maximum number of rows to insert in one transaction. Set to 0 to insert all rows in one transaction.
replace – Whether to replace instead of insert
executemany – If True, all rows are inserted at once in chunks defined by the commit_every parameter. This only works if all rows have same number of column names, but leads to better performance.
fast_executemany – If True, rows will be inserted using an optimized bulk execution strategy (
psycopg2.extras.execute_batch). This can significantly improve performance for large inserts. If set to False, the method falls back to the default implementation fromDbApiHook.insert_rows.autocommit – What to set the connection’s autocommit setting to before executing the query.