Source code for airflow.providers.openlineage.sqlparser
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportloggingfromtypingimportTYPE_CHECKING,Callable,TypedDictimportsqlparsefromattrsimportdefinefromopenlineage.client.event_v2importDatasetfromopenlineage.client.facet_v2importcolumn_lineage_dataset,extraction_error_run,sql_jobfromopenlineage.common.sqlimportDbTableMeta,SqlMeta,parsefromairflow.providers.openlineage.extractors.baseimportOperatorLineagefromairflow.providers.openlineage.utils.sqlimport(TablesHierarchy,create_information_schema_query,get_table_schemas,)fromairflow.providers.openlineage.utils.utilsimportshould_use_external_connectionfromairflow.utils.log.logging_mixinimportLoggingMixinifTYPE_CHECKING:fromopenlineage.client.facet_v2importJobFacet,RunFacetfromsqlalchemy.engineimportEnginefromairflow.hooks.baseimportBaseHookfromairflow.providers.common.sql.hooks.sqlimportDbApiHook
[docs]classDatabaseInfo:""" Contains database specific information needed to process SQL statement parse result. :param scheme: Scheme part of URI in OpenLineage namespace. :param authority: Authority part of URI in OpenLineage namespace. For most cases it should return `{host}:{port}` part of Airflow connection. See: https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md :param database: Takes precedence over parsed database name. :param information_schema_columns: List of columns names from information schema table. :param information_schema_table_name: Information schema table name. :param use_flat_cross_db_query: Specifies whether a single, "global" information schema table should be used for cross-database queries (e.g., in Redshift), or if multiple, per-database "local" information schema tables should be queried individually. If True, assumes a single, universal information schema table is available (for example, in Redshift, the `SVV_REDSHIFT_COLUMNS` view) [https://docs.aws.amazon.com/redshift/latest/dg/r_SVV_REDSHIFT_COLUMNS.html]. In this mode, we query only `information_schema_table_name` directly. Depending on the `is_information_schema_cross_db` argument, you can also filter by database name in the WHERE clause. If False, treats each database as having its own local information schema table containing metadata for that database only. As a result, one query per database may be generated and then combined (often via `UNION ALL`). This approach is necessary for dialects that do not maintain a single global view of all metadata or that require per-database queries. Depending on the `is_information_schema_cross_db` argument, queries can include or omit database information in both identifiers and filters. See `is_information_schema_cross_db` which also affects how final queries are constructed. :param is_information_schema_cross_db: Specifies whether database information should be tracked and included in queries that retrieve schema information from the information_schema_table. In short, this determines whether queries are capable of spanning multiple databases. If True, database identifiers are included wherever applicable, allowing retrieval of metadata from more than one database. For instance, in Snowflake or MS SQL (where each database is treated as a top-level namespace), you might have a query like: ``` SELECT ... FROM db1.information_schema.columns WHERE ... UNION ALL SELECT ... FROM db2.information_schema.columns WHERE ... ``` In Redshift, setting this to True together with `use_flat_cross_db_query=True` allows adding database filters to the query, for example: ``` SELECT ... FROM SVV_REDSHIFT_COLUMNS WHERE SVV_REDSHIFT_COLUMNS.database == db1 # This is skipped when False AND SVV_REDSHIFT_COLUMNS.schema == schema1 AND SVV_REDSHIFT_COLUMNS.table IN (table1, table2) OR ... ``` However, certain databases (e.g., PostgreSQL) do not permit true cross-database queries. In such dialects, enabling cross-database support may lead to errors or be unnecessary. Always consult your dialect's documentation or test sample queries to confirm if cross-database querying is supported. If False, database qualifiers are ignored, effectively restricting queries to a single database (or making the database-level qualifier optional). This is typically safer for databases that do not support cross-database operations or only provide a two-level namespace (schema + table) instead of a three-level one (database + schema + table). For example, some MySQL or PostgreSQL contexts might not need or permit cross-database queries at all. See `use_flat_cross_db_query` which also affects how final queries are constructed. :param is_uppercase_names: Specifies if database accepts only uppercase names (e.g. Snowflake). :param normalize_name_method: Method to normalize database, schema and table names. Defaults to `name.lower()`. """
[docs]classSQLParser(LoggingMixin):""" Interface for openlineage-sql. :param dialect: dialect specific to the database :param default_schema: schema applied to each table with no schema parsed """def__init__(self,dialect:str|None=None,default_schema:str|None=None)->None:super().__init__()
[docs]defparse(self,sql:list[str]|str)->SqlMeta|None:"""Parse a single or a list of SQL statements."""self.log.debug("OpenLineage calling SQL parser with SQL %s dialect %s schema %s",sql,self.dialect,self.default_schema,)returnparse(sql=sql,dialect=self.dialect,default_schema=self.default_schema)
[docs]defparse_table_schemas(self,hook:BaseHook,inputs:list[DbTableMeta],outputs:list[DbTableMeta],database_info:DatabaseInfo,namespace:str=DEFAULT_NAMESPACE,database:str|None=None,sqlalchemy_engine:Engine|None=None,)->tuple[list[Dataset],...]:"""Parse schemas for input and output tables."""database_kwargs:GetTableSchemasParams={"normalize_name":database_info.normalize_name_method,"is_cross_db":database_info.is_information_schema_cross_db,"information_schema_columns":database_info.information_schema_columns,"information_schema_table":database_info.information_schema_table_name,"is_uppercase_names":database_info.is_uppercase_names,"database":databaseordatabase_info.database,"use_flat_cross_db_query":database_info.use_flat_cross_db_query,}returnget_table_schemas(hook,namespace,self.default_schema,databaseordatabase_info.database,self.create_information_schema_query(tables=inputs,sqlalchemy_engine=sqlalchemy_engine,**database_kwargs)ifinputselseNone,self.create_information_schema_query(tables=outputs,sqlalchemy_engine=sqlalchemy_engine,**database_kwargs)ifoutputselseNone,)
[docs]defattach_column_lineage(self,datasets:list[Dataset],database:str|None,parse_result:SqlMeta)->None:""" Attaches column lineage facet to the list of datasets. Note that currently each dataset has the same column lineage information set. This would be a matter of change after OpenLineage SQL Parser improvements. """ifnotlen(parse_result.column_lineage):returnfordatasetindatasets:dataset.facets=dataset.facetsor{}dataset.facets["columnLineage"]=column_lineage_dataset.ColumnLineageDatasetFacet(fields={column_lineage.descendant.name:column_lineage_dataset.Fields(inputFields=[column_lineage_dataset.InputField(namespace=dataset.namespace,name=".".join(filter(None,(column_meta.origin.databaseordatabase,column_meta.origin.schemaorself.default_schema,column_meta.origin.name,),))ifcolumn_meta.originelse"",field=column_meta.name,)forcolumn_metaincolumn_lineage.lineage],transformationType="",transformationDescription="",)forcolumn_lineageinparse_result.column_lineage})
[docs]defgenerate_openlineage_metadata_from_sql(self,sql:list[str]|str,hook:BaseHook,database_info:DatabaseInfo,database:str|None=None,sqlalchemy_engine:Engine|None=None,use_connection:bool=True,)->OperatorLineage:""" Parse SQL statement(s) and generate OpenLineage metadata. Generated OpenLineage metadata contains: * input tables with schemas parsed * output tables with schemas parsed * run facets * job facets. :param sql: a SQL statement or list of SQL statement to be parsed :param hook: Airflow Hook used to connect to the database :param database_info: database specific information :param database: when passed it takes precedence over parsed database name :param sqlalchemy_engine: when passed, engine's dialect is used to compile SQL queries """job_facets:dict[str,JobFacet]={"sql":sql_job.SQLJobFacet(query=self.normalize_sql(sql))}parse_result=self.parse(sql=self.split_sql_string(sql))ifnotparse_result:returnOperatorLineage(job_facets=job_facets)run_facets:dict[str,RunFacet]={}ifparse_result.errors:run_facets["extractionError"]=extraction_error_run.ExtractionErrorRunFacet(totalTasks=len(sql)ifisinstance(sql,list)else1,failedTasks=len(parse_result.errors),errors=[extraction_error_run.Error(errorMessage=error.message,stackTrace=None,task=error.origin_statement,taskNumber=error.index,)forerrorinparse_result.errors],)namespace=self.create_namespace(database_info=database_info)ifuse_connection:inputs,outputs=self.parse_table_schemas(hook=hook,inputs=parse_result.in_tables,outputs=parse_result.out_tables,namespace=namespace,database=database,database_info=database_info,sqlalchemy_engine=sqlalchemy_engine,)else:inputs,outputs=self.get_metadata_from_parser(inputs=parse_result.in_tables,outputs=parse_result.out_tables,namespace=namespace,database=database,database_info=database_info,)self.attach_column_lineage(outputs,databaseordatabase_info.database,parse_result)returnOperatorLineage(inputs=inputs,outputs=outputs,run_facets=run_facets,job_facets=job_facets,)
[docs]defnormalize_sql(cls,sql:list[str]|str)->str:"""Make sure to return a semicolon-separated SQL statement."""return";\n".join(stmt.rstrip(" ;\r\n")forstmtincls.split_sql_string(sql))
@classmethod
[docs]defsplit_sql_string(cls,sql:list[str]|str)->list[str]:""" Split SQL string into list of statements. Tries to use `DbApiHook.split_sql_string` if available. Otherwise, uses the same logic. """try:fromairflow.providers.common.sql.hooks.sqlimportDbApiHooksplit_statement=DbApiHook.split_sql_stringexcept(ImportError,AttributeError):# No common.sql Airflow provider available or version is too old.defsplit_statement(sql:str,strip_semicolon:bool=False)->list[str]:splits=sqlparse.split(sql=sqlparse.format(sql,strip_comments=True),strip_semicolon=strip_semicolon,)return[sforsinsplitsifs]ifisinstance(sql,str):returnsplit_statement(sql)return[objforstmtinsqlforobjincls.split_sql_string(stmt)ifobj!=""]
[docs]defcreate_information_schema_query(self,tables:list[DbTableMeta],normalize_name:Callable[[str],str],is_cross_db:bool,information_schema_columns:list[str],information_schema_table:str,is_uppercase_names:bool,use_flat_cross_db_query:bool,database:str|None=None,sqlalchemy_engine:Engine|None=None,)->str:"""Create SELECT statement to query information schema table."""tables_hierarchy=self._get_tables_hierarchy(tables,normalize_name=normalize_name,database=database,is_cross_db=is_cross_db,)returncreate_information_schema_query(columns=information_schema_columns,information_schema_table_name=information_schema_table,tables_hierarchy=tables_hierarchy,use_flat_cross_db_query=use_flat_cross_db_query,uppercase_names=is_uppercase_names,sqlalchemy_engine=sqlalchemy_engine,)
@staticmethoddef_get_tables_hierarchy(tables:list[DbTableMeta],normalize_name:Callable[[str],str],database:str|None=None,is_cross_db:bool=False,)->TablesHierarchy:""" Create a hierarchy of database -> schema -> table name. This helps to create simpler information schema query grouped by database and schema. :param tables: List of tables. :param normalize_name: A method to normalize all names. :param is_cross_db: If false, set top (database) level to None when creating hierarchy. """hierarchy:TablesHierarchy={}fortableintables:ifis_cross_db:db=table.databaseordatabaseelse:db=Noneschemas=hierarchy.setdefault(normalize_name(db)ifdbelsedb,{})tables=schemas.setdefault(normalize_name(table.schema)iftable.schemaelseNone,[])tables.append(table.name)returnhierarchy
[docs]defget_openlineage_facets_with_sql(hook:DbApiHook,sql:str|list[str],conn_id:str,database:str|None)->OperatorLineage|None:connection=hook.get_connection(conn_id)try:database_info=hook.get_openlineage_database_info(connection)exceptAttributeError:database_info=Noneifdatabase_infoisNone:log.debug("%s has no database info provided",hook)returnNonetry:sql_parser=SQLParser(dialect=hook.get_openlineage_database_dialect(connection),default_schema=hook.get_openlineage_default_schema(),)exceptAttributeError:log.debug("%s failed to get database dialect",hook)returnNonetry:sqlalchemy_engine=hook.get_sqlalchemy_engine()exceptExceptionase:log.debug("Failed to get sql alchemy engine: %s",e)sqlalchemy_engine=Noneoperator_lineage=sql_parser.generate_openlineage_metadata_from_sql(sql=sql,hook=hook,database_info=database_info,database=database,sqlalchemy_engine=sqlalchemy_engine,use_connection=should_use_external_connection(hook),)returnoperator_lineage