Airflow Summit 2025 is coming October 07-09. Register now to secure your spot!

Source code for airflow.providers.google.cloud.transfers.bigquery_to_mssql

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains Google BigQuery to MSSQL operator."""

from __future__ import annotations

import warnings
from collections.abc import Sequence
from functools import cached_property
from typing import TYPE_CHECKING

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
from airflow.providers.google.cloud.transfers.bigquery_to_sql import BigQueryToSqlBaseOperator
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook

if TYPE_CHECKING:
    from airflow.providers.openlineage.extractors import OperatorLineage
    from airflow.utils.context import Context


[docs] class BigQueryToMsSqlOperator(BigQueryToSqlBaseOperator): """ Fetch data from a BigQuery table (alternatively fetch selected columns) and insert it into a MSSQL table. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:BigQueryToMsSqlOperator` :param source_project_dataset_table: A dotted ``<project>.<dataset>.<table>``: the big query table of origin :param mssql_table: target MsSQL table. It is deprecated: use target_table_name instead. (templated) :param target_table_name: target MsSQL table. It takes precedence over mssql_table. (templated) :param mssql_conn_id: reference to a specific mssql hook .. warning:: The `mssql_table` parameter has been deprecated. Use `target_table_name` instead. """
[docs] template_fields: Sequence[str] = ( *BigQueryToSqlBaseOperator.template_fields, "source_project_dataset_table", )
def __init__( self, *, source_project_dataset_table: str, mssql_table: str | None = None, target_table_name: str | None = None, mssql_conn_id: str = "mssql_default", **kwargs, ) -> None: if mssql_table is not None: warnings.warn( "The `mssql_table` parameter has been deprecated. Use `target_table_name` instead.", AirflowProviderDeprecationWarning, stacklevel=2, ) if target_table_name is not None: raise ValueError( f"Cannot set both arguments: mssql_table={mssql_table!r} and " f"target_table_name={target_table_name!r}." ) target_table_name = mssql_table try: _, dataset_id, table_id = source_project_dataset_table.split(".") except ValueError: raise ValueError( f"Could not parse {source_project_dataset_table} as <project>.<dataset>.<table>" ) from None super().__init__( target_table_name=target_table_name, dataset_table=f"{dataset_id}.{table_id}", **kwargs, )
[docs] self.mssql_conn_id = mssql_conn_id
[docs] self.source_project_dataset_table = source_project_dataset_table
@cached_property
[docs] def mssql_hook(self) -> MsSqlHook: return MsSqlHook(schema=self.database, mssql_conn_id=self.mssql_conn_id)
[docs] def get_sql_hook(self) -> MsSqlHook: return self.mssql_hook
[docs] def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage | None: from airflow.providers.common.compat.openlineage.facet import Dataset from airflow.providers.google.cloud.openlineage.utils import ( BIGQUERY_NAMESPACE, get_facets_from_bq_table_for_given_fields, get_identity_column_lineage_facet, ) from airflow.providers.openlineage.extractors import OperatorLineage if not self.bigquery_hook: self.bigquery_hook = BigQueryHook( gcp_conn_id=self.gcp_conn_id, location=self.location, impersonation_chain=self.impersonation_chain, ) try: table_obj = self.bigquery_hook.get_client().get_table(self.source_project_dataset_table) except Exception: self.log.debug( "OpenLineage: could not fetch BigQuery table %s", self.source_project_dataset_table, exc_info=True, ) return OperatorLineage() if self.selected_fields: if isinstance(self.selected_fields, str): bigquery_field_names = list(self.selected_fields) else: bigquery_field_names = self.selected_fields else: bigquery_field_names = [f.name for f in getattr(table_obj, "schema", [])] input_dataset = Dataset( namespace=BIGQUERY_NAMESPACE, name=self.source_project_dataset_table, facets=get_facets_from_bq_table_for_given_fields(table_obj, bigquery_field_names), ) db_info = self.mssql_hook.get_openlineage_database_info(self.mssql_hook.get_conn()) default_schema = self.mssql_hook.get_openlineage_default_schema() namespace = f"{db_info.scheme}://{db_info.authority}" if self.target_table_name and "." in self.target_table_name: schema_name, table_name = self.target_table_name.split(".", 1) else: schema_name = default_schema or "" table_name = self.target_table_name or "" if self.database: output_name = f"{self.database}.{schema_name}.{table_name}" else: output_name = f"{schema_name}.{table_name}" column_lineage_facet = get_identity_column_lineage_facet( bigquery_field_names, input_datasets=[input_dataset] ) output_facets = column_lineage_facet or {} output_dataset = Dataset(namespace=namespace, name=output_name, facets=output_facets) return OperatorLineage(inputs=[input_dataset], outputs=[output_dataset])

Was this entry helpful?