Source code for airflow.providers.google.cloud.openlineage.mixins
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportcopyimportjsonimporttracebackfromtypingimportTYPE_CHECKING,castifTYPE_CHECKING:fromairflow.providers.common.compat.openlineage.facetimport(Dataset,InputDataset,OutputDataset,OutputStatisticsOutputDatasetFacet,RunFacet,SchemaDatasetFacet,)fromairflow.providers.google.cloud.openlineage.utilsimportBigQueryJobRunFacet
class_BigQueryOpenLineageMixin:defget_openlineage_facets_on_complete(self,_):""" Retrieve OpenLineage data for a COMPLETE BigQuery job. This method retrieves statistics for the specified job_ids using the BigQueryDatasetsProvider. It calls BigQuery API, retrieving input and output dataset info from it, as well as run-level usage statistics. Run facets should contain: - ExternalQueryRunFacet - BigQueryJobRunFacet Run facets may contain: - ErrorMessageRunFacet Job facets should contain: - SqlJobFacet if operator has self.sql Input datasets should contain facets: - DataSourceDatasetFacet - SchemaDatasetFacet Output datasets should contain facets: - DataSourceDatasetFacet - SchemaDatasetFacet - OutputStatisticsOutputDatasetFacet """fromairflow.providers.common.compat.openlineage.facetimportExternalQueryRunFacet,SQLJobFacetfromairflow.providers.openlineage.extractorsimportOperatorLineagefromairflow.providers.openlineage.sqlparserimportSQLParserifnotself.job_id:ifhasattr(self,"log"):self.log.warning("No BigQuery job_id was found by OpenLineage.")returnOperatorLineage()ifnotself.hook:fromairflow.providers.google.cloud.hooks.bigqueryimportBigQueryHookself.hook=BigQueryHook(gcp_conn_id=self.gcp_conn_id,impersonation_chain=self.impersonation_chain,)run_facets:dict[str,RunFacet]={"externalQuery":ExternalQueryRunFacet(externalQueryId=self.job_id,source="bigquery")}job_facets={"sql":SQLJobFacet(query=SQLParser.normalize_sql(self.sql))}self.client=self.hook.get_client(project_id=self.hook.project_id)job_ids=self.job_idifisinstance(self.job_id,str):job_ids=[self.job_id]inputs,outputs=[],[]forjob_idinjob_ids:inner_inputs,inner_outputs,inner_run_facets=self.get_facets(job_id=job_id)inputs.extend(inner_inputs)outputs.extend(inner_outputs)run_facets.update(inner_run_facets)returnOperatorLineage(inputs=inputs,outputs=outputs,run_facets=run_facets,job_facets=job_facets,)defget_facets(self,job_id:str):fromairflow.providers.common.compat.openlineage.facetimportErrorMessageRunFacetfromairflow.providers.google.cloud.openlineage.utilsimport(BigQueryErrorRunFacet,get_from_nullable_chain,)inputs=[]outputs=[]run_facets:dict[str,RunFacet]={}ifhasattr(self,"log"):self.log.debug("Extracting data from bigquery job: `%s`",job_id)try:job=self.client.get_job(job_id=job_id)# type: ignoreprops=job._propertiesifget_from_nullable_chain(props,["status","state"])!="DONE":raiseValueError(f"Trying to extract data from running bigquery job: `{job_id}`")# TODO: remove bigQuery_job in next releaserun_facets["bigQuery_job"]=run_facets["bigQueryJob"]=self._get_bigquery_job_run_facet(props)ifget_from_nullable_chain(props,["statistics","numChildJobs"]):ifhasattr(self,"log"):self.log.debug("Found SCRIPT job. Extracting lineage from child jobs instead.")# SCRIPT job type has no input / output information but spawns child jobs that have one# https://cloud.google.com/bigquery/docs/information-schema-jobs#multi-statement_query_jobforchild_job_idinself.client.list_jobs(parent_job=job_id):child_job=self.client.get_job(job_id=child_job_id)# type: ignorechild_inputs,child_output=self._get_inputs_outputs_from_job(child_job._properties)inputs.extend(child_inputs)outputs.append(child_output)else:inputs,_output=self._get_inputs_outputs_from_job(props)outputs.append(_output)exceptExceptionase:ifhasattr(self,"log"):self.log.warning("Cannot retrieve job details from BigQuery.Client. %s",e,exc_info=True)exception_msg=traceback.format_exc()# TODO: remove BigQueryErrorRunFacet in next releaserun_facets.update({"errorMessage":ErrorMessageRunFacet(message=f"{e}: {exception_msg}",programmingLanguage="python",),"bigQuery_error":BigQueryErrorRunFacet(clientError=f"{e}: {exception_msg}",),})deduplicated_outputs=self._deduplicate_outputs(outputs)returninputs,deduplicated_outputs,run_facetsdef_deduplicate_outputs(self,outputs:list[OutputDataset|None])->list[OutputDataset]:# Sources are the same so we can compare only namesfinal_outputs={}forsingle_outputinoutputs:ifnotsingle_output:continuekey=single_output.nameifkeynotinfinal_outputs:final_outputs[key]=single_outputcontinue# No OutputStatisticsOutputDatasetFacet is added to duplicated outputs as we can not determine# if the rowCount or size can be summed together.ifsingle_output.outputFacets:single_output.outputFacets.pop("outputStatistics",None)final_outputs[key]=single_outputreturnlist(final_outputs.values())def_get_inputs_outputs_from_job(self,properties:dict)->tuple[list[InputDataset],OutputDataset|None]:fromairflow.providers.google.cloud.openlineage.utilsimportget_from_nullable_chaininput_tables=get_from_nullable_chain(properties,["statistics","query","referencedTables"])or[]output_table=get_from_nullable_chain(properties,["configuration","query","destinationTable"])inputs=[(self._get_input_dataset(input_table))forinput_tableininput_tables]ifoutput_table:output=self._get_output_dataset(output_table)dataset_stat_facet=self._get_statistics_dataset_facet(properties)output.outputFacets=output.outputFacetsor{}ifdataset_stat_facet:output.outputFacets["outputStatistics"]=dataset_stat_facetreturninputs,output@staticmethoddef_get_bigquery_job_run_facet(properties:dict)->BigQueryJobRunFacet:fromairflow.providers.google.cloud.openlineage.utilsimport(BigQueryJobRunFacet,get_from_nullable_chain,)ifget_from_nullable_chain(properties,["configuration","query","query"]):# Exclude the query to avoid event size issues and duplicating SqlJobFacet information.properties=copy.deepcopy(properties)properties["configuration"]["query"].pop("query")cache_hit=get_from_nullable_chain(properties,["statistics","query","cacheHit"])billed_bytes=get_from_nullable_chain(properties,["statistics","query","totalBytesBilled"])returnBigQueryJobRunFacet(cached=str(cache_hit).lower()=="true",billedBytes=int(billed_bytes)ifbilled_byteselseNone,properties=json.dumps(properties),)@staticmethoddef_get_statistics_dataset_facet(properties,)->OutputStatisticsOutputDatasetFacet|None:fromairflow.providers.common.compat.openlineage.facetimportOutputStatisticsOutputDatasetFacetfromairflow.providers.google.cloud.openlineage.utilsimportget_from_nullable_chainquery_plan=get_from_nullable_chain(properties,chain=["statistics","query","queryPlan"])ifnotquery_plan:returnNoneout_stage=query_plan[-1]out_rows=out_stage.get("recordsWritten",None)out_bytes=out_stage.get("shuffleOutputBytes",None)ifout_bytesandout_rows:returnOutputStatisticsOutputDatasetFacet(rowCount=int(out_rows),size=int(out_bytes))returnNonedef_get_input_dataset(self,table:dict)->InputDataset:fromairflow.providers.common.compat.openlineage.facetimportInputDatasetreturncast(InputDataset,self._get_dataset(table,"input"))def_get_output_dataset(self,table:dict)->OutputDataset:fromairflow.providers.common.compat.openlineage.facetimportOutputDatasetreturncast(OutputDataset,self._get_dataset(table,"output"))def_get_dataset(self,table:dict,dataset_type:str)->Dataset:fromairflow.providers.common.compat.openlineage.facetimportInputDataset,OutputDatasetproject=table.get("projectId")dataset=table.get("datasetId")table_name=table.get("tableId")dataset_name=f"{project}.{dataset}.{table_name}"dataset_schema=self._get_table_schema_safely(dataset_name)ifdataset_type=="input":# Logic specific to creating InputDataset (if needed)returnInputDataset(namespace=BIGQUERY_NAMESPACE,name=dataset_name,facets={"schema":dataset_schema,}ifdataset_schemaelse{},)elifdataset_type=="output":# Logic specific to creating OutputDataset (if needed)returnOutputDataset(namespace=BIGQUERY_NAMESPACE,name=dataset_name,facets={"schema":dataset_schema,}ifdataset_schemaelse{},)else:raiseValueError("Invalid dataset_type. Must be 'input' or 'output'")def_get_table_schema_safely(self,table_name:str)->SchemaDatasetFacet|None:try:returnself._get_table_schema(table_name)exceptExceptionase:ifhasattr(self,"log"):self.log.warning("Could not extract output schema from bigquery. %s",e)returnNonedef_get_table_schema(self,table:str)->SchemaDatasetFacet|None:fromairflow.providers.common.compat.openlineage.facetimport(SchemaDatasetFacet,SchemaDatasetFacetFields,)fromairflow.providers.google.cloud.openlineage.utilsimportget_from_nullable_chainbq_table=self.client.get_table(table)ifnotbq_table._properties:returnNonefields=get_from_nullable_chain(bq_table._properties,["schema","fields"])ifnotfields:returnNonereturnSchemaDatasetFacet(fields=[SchemaDatasetFacetFields(name=field.get("name"),type=field.get("type"),description=field.get("description"),)forfieldinfields])