Source code for airflow.providers.google.cloud.hooks.bigquery

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
This module contains a BigQuery Hook, as well as a very basic PEP 249
implementation for BigQuery.
"""
import hashlib
import json
import logging
import time
import warnings
from copy import deepcopy
from datetime import datetime, timedelta
from typing import Any, Dict, Iterable, List, Mapping, NoReturn, Optional, Sequence, Tuple, Type, Union

from google.api_core.retry import Retry
from google.cloud.bigquery import (
    DEFAULT_RETRY,
    Client,
    CopyJob,
    ExternalConfig,
    ExtractJob,
    LoadJob,
    QueryJob,
    SchemaField,
)
from google.cloud.bigquery.dataset import AccessEntry, Dataset, DatasetListItem, DatasetReference
from google.cloud.bigquery.table import EncryptionConfiguration, Row, Table, TableReference
from google.cloud.exceptions import NotFound
from googleapiclient.discovery import Resource, build
from pandas import DataFrame
from pandas_gbq import read_gbq
from pandas_gbq.gbq import (
    GbqConnector,
    _check_google_client_version as gbq_check_google_client_version,
    _test_google_api_imports as gbq_test_google_api_imports,
)

from airflow.exceptions import AirflowException
from airflow.hooks.dbapi import DbApiHook
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
from airflow.utils.helpers import convert_camel_to_snake
from airflow.utils.log.logging_mixin import LoggingMixin

[docs]log = logging.getLogger(__name__)
[docs]BigQueryJob = Union[CopyJob, QueryJob, LoadJob, ExtractJob]
[docs]class BigQueryHook(GoogleBaseHook, DbApiHook): """ Interact with BigQuery. This hook uses the Google Cloud connection. :param gcp_conn_id: The Airflow connection used for GCP credentials. :type gcp_conn_id: Optional[str] :param delegate_to: This performs a task on one host with reference to other hosts. :type delegate_to: Optional[str] :param use_legacy_sql: This specifies whether to use legacy SQL dialect. :type use_legacy_sql: bool :param location: The location of the BigQuery resource. :type location: Optional[str] :param bigquery_conn_id: The Airflow connection used for BigQuery credentials. :type bigquery_conn_id: Optional[str] :param api_resource_configs: This contains params configuration applied for Google BigQuery jobs. :type api_resource_configs: Optional[Dict] :param impersonation_chain: This is the optional service account to impersonate using short term credentials. :type impersonation_chain: Optional[Union[str, Sequence[str]]] :param labels: The BigQuery resource label. :type labels: Optional[Dict] """
[docs] conn_name_attr = 'gcp_conn_id'
[docs] default_conn_name = 'google_cloud_bigquery_default'
[docs] conn_type = 'gcpbigquery'
[docs] hook_name = 'Google Bigquery'
def __init__( self, gcp_conn_id: str = GoogleBaseHook.default_conn_name, delegate_to: Optional[str] = None, use_legacy_sql: bool = True, location: Optional[str] = None, bigquery_conn_id: Optional[str] = None, api_resource_configs: Optional[Dict] = None, impersonation_chain: Optional[Union[str, Sequence[str]]] = None, labels: Optional[Dict] = None, ) -> None: # To preserve backward compatibility # TODO: remove one day if bigquery_conn_id: warnings.warn( "The bigquery_conn_id parameter has been deprecated. You should pass " "the gcp_conn_id parameter.", DeprecationWarning, stacklevel=2, ) gcp_conn_id = bigquery_conn_id super().__init__( gcp_conn_id=gcp_conn_id, delegate_to=delegate_to, impersonation_chain=impersonation_chain, ) self.use_legacy_sql = use_legacy_sql self.location = location self.running_job_id = None # type: Optional[str] self.api_resource_configs = api_resource_configs if api_resource_configs else {} # type Dict self.labels = labels
[docs] def get_conn(self) -> "BigQueryConnection": """Returns a BigQuery PEP 249 connection object.""" service = self.get_service() return BigQueryConnection( service=service, project_id=self.project_id, use_legacy_sql=self.use_legacy_sql, location=self.location, num_retries=self.num_retries, hook=self,
)
[docs] def get_service(self) -> Resource: """Returns a BigQuery service object.""" warnings.warn( "This method will be deprecated. Please use `BigQueryHook.get_client` method", DeprecationWarning ) http_authorized = self._authorize() return build('bigquery', 'v2', http=http_authorized, cache_discovery=False)
[docs] def get_client(self, project_id: Optional[str] = None, location: Optional[str] = None) -> Client: """ Returns authenticated BigQuery Client. :param project_id: Project ID for the project which the client acts on behalf of. :type project_id: str :param location: Default location for jobs / datasets / tables. :type location: str :return: """ return Client( client_info=self.client_info, project=project_id, location=location, credentials=self._get_credentials(),
) @staticmethod def _resolve_table_reference( table_resource: Dict[str, Any], project_id: Optional[str] = None, dataset_id: Optional[str] = None, table_id: Optional[str] = None, ) -> Dict[str, Any]: try: # Check if tableReference is present and is valid TableReference.from_api_repr(table_resource["tableReference"]) except KeyError: # Something is wrong so we try to build the reference table_resource["tableReference"] = table_resource.get("tableReference", {}) values = [("projectId", project_id), ("tableId", table_id), ("datasetId", dataset_id)] for key, value in values: # Check if value is already present if no use the provided one resolved_value = table_resource["tableReference"].get(key, value) if not resolved_value: # If there's no value in tableReference and provided one is None raise error raise AirflowException( f"Table resource is missing proper `tableReference` and `{key}` is None" ) table_resource["tableReference"][key] = resolved_value return table_resource
[docs] def insert_rows( self, table: Any, rows: Any, target_fields: Any = None, commit_every: Any = 1000, replace: Any = False, **kwargs, ) -> None: """ Insertion is currently unsupported. Theoretically, you could use BigQuery's streaming API to insert rows into a table, but this hasn't been implemented. """ raise NotImplementedError()
[docs] def get_pandas_df( self, sql: str, parameters: Optional[Union[Iterable, Mapping]] = None, dialect: Optional[str] = None, **kwargs, ) -> DataFrame: """ Returns a Pandas DataFrame for the results produced by a BigQuery query. The DbApiHook method must be overridden because Pandas doesn't support PEP 249 connections, except for SQLite. See: https://github.com/pydata/pandas/blob/master/pandas/io/sql.py#L447 https://github.com/pydata/pandas/issues/6900 :param sql: The BigQuery SQL to execute. :type sql: str :param parameters: The parameters to render the SQL query with (not used, leave to override superclass method) :type parameters: mapping or iterable :param dialect: Dialect of BigQuery SQL – legacy SQL or standard SQL defaults to use `self.use_legacy_sql` if not specified :type dialect: str in {'legacy', 'standard'} :param kwargs: (optional) passed into pandas_gbq.read_gbq method :type kwargs: dict """ if dialect is None: dialect = 'legacy' if self.use_legacy_sql else 'standard' credentials, project_id = self._get_credentials_and_project_id() return read_gbq( sql, project_id=project_id, dialect=dialect, verbose=False, credentials=credentials, **kwargs
) @GoogleBaseHook.fallback_to_default_project_id
[docs] def table_exists(self, dataset_id: str, table_id: str, project_id: str) -> bool: """ Checks for the existence of a table in Google BigQuery. :param project_id: The Google cloud project in which to look for the table. The connection supplied to the hook must provide access to the specified project. :type project_id: str :param dataset_id: The name of the dataset in which to look for the table. :type dataset_id: str :param table_id: The name of the table to check the existence of. :type table_id: str """ table_reference = TableReference(DatasetReference(project_id, dataset_id), table_id) try: self.get_client(project_id=project_id).get_table(table_reference) return True except NotFound: return False
@GoogleBaseHook.fallback_to_default_project_id
[docs] def table_partition_exists( self, dataset_id: str, table_id: str, partition_id: str, project_id: str ) -> bool: """ Checks for the existence of a partition in a table in Google BigQuery. :param project_id: The Google cloud project in which to look for the table. The connection supplied to the hook must provide access to the specified project. :type project_id: str :param dataset_id: The name of the dataset in which to look for the table. :type dataset_id: str :param table_id: The name of the table to check the existence of. :type table_id: str :param partition_id: The name of the partition to check the existence of. :type partition_id: str """ table_reference = TableReference(DatasetReference(project_id, dataset_id), table_id) try: return partition_id in self.get_client(project_id=project_id).list_partitions(table_reference) except NotFound: return False
@GoogleBaseHook.fallback_to_default_project_id
[docs] def create_empty_table( self, project_id: Optional[str] = None, dataset_id: Optional[str] = None, table_id: Optional[str] = None, table_resource: Optional[Dict[str, Any]] = None, schema_fields: Optional[List] = None, time_partitioning: Optional[Dict] = None, cluster_fields: Optional[List[str]] = None, labels: Optional[Dict] = None, view: Optional[Dict] = None, materialized_view: Optional[Dict] = None, encryption_configuration: Optional[Dict] = None, retry: Optional[Retry] = DEFAULT_RETRY, num_retries: Optional[int] = None, location: Optional[str] = None, exists_ok: bool = True, ) -> Table: """ Creates a new, empty table in the dataset. To create a view, which is defined by a SQL query, parse a dictionary to 'view' kwarg :param project_id: The project to create the table into. :type project_id: str :param dataset_id: The dataset to create the table into. :type dataset_id: str :param table_id: The Name of the table to be created. :type table_id: str :param table_resource: Table resource as described in documentation: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table If provided all other parameters are ignored. :type table_resource: Dict[str, Any] :param schema_fields: If set, the schema field list as defined here: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema :type schema_fields: list :param labels: a dictionary containing labels for the table, passed to BigQuery :type labels: dict :param retry: Optional. How to retry the RPC. :type retry: google.api_core.retry.Retry **Example**: :: schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}] :param time_partitioning: configure optional time partitioning fields i.e. partition by field, type and expiration as per API specifications. .. seealso:: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#timePartitioning :type time_partitioning: dict :param cluster_fields: [Optional] The fields used for clustering. BigQuery supports clustering for both partitioned and non-partitioned tables. https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#clustering.fields :type cluster_fields: list :param view: [Optional] A dictionary containing definition for the view. If set, it will create a view instead of a table: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ViewDefinition :type view: dict **Example**: :: view = { "query": "SELECT * FROM `test-project-id.test_dataset_id.test_table_prefix*` LIMIT 1000", "useLegacySql": False } :param materialized_view: [Optional] The materialized view definition. :type materialized_view: dict :param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys). **Example**: :: encryption_configuration = { "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key" } :type encryption_configuration: dict :param num_retries: Maximum number of retries in case of connection problems. :type num_retries: int :param exists_ok: If ``True``, ignore "already exists" errors when creating the table. :type exists_ok: bool :return: Created table """ if num_retries: warnings.warn("Parameter `num_retries` is deprecated", DeprecationWarning) _table_resource: Dict[str, Any] = {} if self.location: _table_resource['location'] = self.location if schema_fields: _table_resource['schema'] = {'fields': schema_fields} if time_partitioning: _table_resource['timePartitioning'] = time_partitioning if cluster_fields: _table_resource['clustering'] = {'fields': cluster_fields} if labels: _table_resource['labels'] = labels if view: _table_resource['view'] = view if materialized_view: _table_resource['materializedView'] = materialized_view if encryption_configuration: _table_resource["encryptionConfiguration"] = encryption_configuration table_resource = table_resource or _table_resource table_resource = self._resolve_table_reference( table_resource=table_resource, project_id=project_id, dataset_id=dataset_id, table_id=table_id, ) table = Table.from_api_repr(table_resource) return self.get_client(project_id=project_id, location=location).create_table( table=table, exists_ok=exists_ok, retry=retry
) @GoogleBaseHook.fallback_to_default_project_id
[docs] def create_empty_dataset( self, dataset_id: Optional[str] = None, project_id: Optional[str] = None, location: Optional[str] = None, dataset_reference: Optional[Dict[str, Any]] = None, exists_ok: bool = True, ) -> None: """ Create a new empty dataset: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/insert :param project_id: The name of the project where we want to create an empty a dataset. Don't need to provide, if projectId in dataset_reference. :type project_id: str :param dataset_id: The id of dataset. Don't need to provide, if datasetId in dataset_reference. :type dataset_id: str :param location: (Optional) The geographic location where the dataset should reside. There is no default value but the dataset will be created in US if nothing is provided. :type location: str :param dataset_reference: Dataset reference that could be provided with request body. More info: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource :type dataset_reference: dict :param exists_ok: If ``True``, ignore "already exists" errors when creating the dataset. :type exists_ok: bool """ dataset_reference = dataset_reference or {"datasetReference": {}} for param, value in zip(["datasetId", "projectId"], [dataset_id, project_id]): specified_param = dataset_reference["datasetReference"].get(param) if specified_param: if value: self.log.info( "`%s` was provided in both `dataset_reference` and as `%s`. " "Using value from `dataset_reference`", param, convert_camel_to_snake(param), ) continue # use specified value if not value: raise ValueError( f"Please specify `{param}` either in `dataset_reference` " f"or by providing `{convert_camel_to_snake(param)}`", ) # dataset_reference has no param but we can fallback to default value self.log.info( "%s was not specified in `dataset_reference`. Will use default value %s.", param, value ) dataset_reference["datasetReference"][param] = value location = location or self.location if location: dataset_reference["location"] = dataset_reference.get("location", location) dataset: Dataset = Dataset.from_api_repr(dataset_reference) self.log.info('Creating dataset: %s in project: %s ', dataset.dataset_id, dataset.project) self.get_client(location=location).create_dataset(dataset=dataset, exists_ok=exists_ok) self.log.info('Dataset created successfully.')
@GoogleBaseHook.fallback_to_default_project_id
[docs] def get_dataset_tables( self, dataset_id: str, project_id: Optional[str] = None, max_results: Optional[int] = None, retry: Retry = DEFAULT_RETRY, ) -> List[Dict[str, Any]]: """ Get the list of tables for a given dataset. For more information, see: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list :param dataset_id: the dataset ID of the requested dataset. :type dataset_id: str :param project_id: (Optional) the project of the requested dataset. If None, self.project_id will be used. :type project_id: str :param max_results: (Optional) the maximum number of tables to return. :type max_results: int :param retry: How to retry the RPC. :type retry: google.api_core.retry.Retry :return: List of tables associated with the dataset. """ self.log.info('Start getting tables list from dataset: %s.%s', project_id, dataset_id) tables = self.get_client().list_tables( dataset=DatasetReference(project=project_id, dataset_id=dataset_id), max_results=max_results, retry=retry, ) # Convert to a list (consumes all values) return [t.reference.to_api_repr() for t in tables]
@GoogleBaseHook.fallback_to_default_project_id
[docs] def delete_dataset( self, dataset_id: str, project_id: Optional[str] = None, delete_contents: bool = False, retry: Retry = DEFAULT_RETRY, ) -> None: """ Delete a dataset of Big query in your project. :param project_id: The name of the project where we have the dataset. :type project_id: str :param dataset_id: The dataset to be delete. :type dataset_id: str :param delete_contents: If True, delete all the tables in the dataset. If False and the dataset contains tables, the request will fail. :type delete_contents: bool :param retry: How to retry the RPC. :type retry: google.api_core.retry.Retry """ self.log.info('Deleting from project: %s Dataset:%s', project_id, dataset_id) self.get_client(project_id=project_id).delete_dataset( dataset=DatasetReference(project=project_id, dataset_id=dataset_id), delete_contents=delete_contents, retry=retry, not_found_ok=True,
) @GoogleBaseHook.fallback_to_default_project_id
[docs] def create_external_table( self, external_project_dataset_table: str, schema_fields: List, source_uris: List, source_format: str = 'CSV', autodetect: bool = False, compression: str = 'NONE', ignore_unknown_values: bool = False, max_bad_records: int = 0, skip_leading_rows: int = 0, field_delimiter: str = ',', quote_character: Optional[str] = None, allow_quoted_newlines: bool = False, allow_jagged_rows: bool = False, encoding: str = "UTF-8", src_fmt_configs: Optional[Dict] = None, labels: Optional[Dict] = None, description: Optional[str] = None, encryption_configuration: Optional[Dict] = None, location: Optional[str] = None, project_id: Optional[str] = None, ) -> None: """ Creates a new external table in the dataset with the data from Google Cloud Storage. See here: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource for more details about these parameters. :param external_project_dataset_table: The dotted ``(<project>.|<project>:)<dataset>.<table>($<partition>)`` BigQuery table name to create external table. If ``<project>`` is not included, project will be the project defined in the connection json. :type external_project_dataset_table: str :param schema_fields: The schema field list as defined here: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource :type schema_fields: list :param source_uris: The source Google Cloud Storage URI (e.g. gs://some-bucket/some-file.txt). A single wild per-object name can be used. :type source_uris: list :param source_format: File format to export. :type source_format: str :param autodetect: Try to detect schema and format options automatically. Any option specified explicitly will be honored. :type autodetect: bool :param compression: [Optional] The compression type of the data source. Possible values include GZIP and NONE. The default value is NONE. This setting is ignored for Google Cloud Bigtable, Google Cloud Datastore backups and Avro formats. :type compression: str :param ignore_unknown_values: [Optional] Indicates if BigQuery should allow extra values that are not represented in the table schema. If true, the extra values are ignored. If false, records with extra columns are treated as bad records, and if there are too many bad records, an invalid error is returned in the job result. :type ignore_unknown_values: bool :param max_bad_records: The maximum number of bad records that BigQuery can ignore when running the job. :type max_bad_records: int :param skip_leading_rows: Number of rows to skip when loading from a CSV. :type skip_leading_rows: int :param field_delimiter: The delimiter to use when loading from a CSV. :type field_delimiter: str :param quote_character: The value that is used to quote data sections in a CSV file. :type quote_character: str :param allow_quoted_newlines: Whether to allow quoted newlines (true) or not (false). :type allow_quoted_newlines: bool :param allow_jagged_rows: Accept rows that are missing trailing optional columns. The missing values are treated as nulls. If false, records with missing trailing columns are treated as bad records, and if there are too many bad records, an invalid error is returned in the job result. Only applicable when source_format is CSV. :type allow_jagged_rows: bool :param encoding: The character encoding of the data. See: .. seealso:: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.csvOptions.encoding :type encoding: str :param src_fmt_configs: configure optional fields specific to the source format :type src_fmt_configs: dict :param labels: A dictionary containing labels for the BiqQuery table. :type labels: dict :param description: A string containing the description for the BigQuery table. :type descriptin: str :param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys). **Example**: :: encryption_configuration = { "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key" } :type encryption_configuration: dict """ warnings.warn( "This method is deprecated. Please use `BigQueryHook.create_empty_table` method with" "pass passing the `table_resource` object. This gives more flexibility than this method.", DeprecationWarning, ) location = location or self.location src_fmt_configs = src_fmt_configs or {} source_format = source_format.upper() compression = compression.upper() external_config_api_repr = { 'autodetect': autodetect, 'sourceFormat': source_format, 'sourceUris': source_uris, 'compression': compression, 'ignoreUnknownValues': ignore_unknown_values, } # if following fields are not specified in src_fmt_configs, # honor the top-level params for backward-compatibility backward_compatibility_configs = { 'skipLeadingRows': skip_leading_rows, 'fieldDelimiter': field_delimiter, 'quote': quote_character, 'allowQuotedNewlines': allow_quoted_newlines, 'allowJaggedRows': allow_jagged_rows, 'encoding': encoding, } src_fmt_to_param_mapping = {'CSV': 'csvOptions', 'GOOGLE_SHEETS': 'googleSheetsOptions'} src_fmt_to_configs_mapping = { 'csvOptions': [ 'allowJaggedRows', 'allowQuotedNewlines', 'fieldDelimiter', 'skipLeadingRows', 'quote', 'encoding', ], 'googleSheetsOptions': ['skipLeadingRows'], } if source_format in src_fmt_to_param_mapping.keys(): valid_configs = src_fmt_to_configs_mapping[src_fmt_to_param_mapping[source_format]] src_fmt_configs = _validate_src_fmt_configs( source_format, src_fmt_configs, valid_configs, backward_compatibility_configs ) external_config_api_repr[src_fmt_to_param_mapping[source_format]] = src_fmt_configs # build external config external_config = ExternalConfig.from_api_repr(external_config_api_repr) if schema_fields: external_config.schema = [SchemaField.from_api_repr(f) for f in schema_fields] if max_bad_records: external_config.max_bad_records = max_bad_records # build table definition table = Table(table_ref=TableReference.from_string(external_project_dataset_table, project_id)) table.external_data_configuration = external_config if labels: table.labels = labels if description: table.description = description if encryption_configuration: table.encryption_configuration = EncryptionConfiguration.from_api_repr(encryption_configuration) self.log.info('Creating external table: %s', external_project_dataset_table) self.create_empty_table( table_resource=table.to_api_repr(), project_id=project_id, location=location, exists_ok=True ) self.log.info('External table created successfully: %s', external_project_dataset_table)
@GoogleBaseHook.fallback_to_default_project_id
[docs] def update_table( self, table_resource: Dict[str, Any], fields: Optional[List[str]] = None, dataset_id: Optional[str] = None, table_id: Optional[str] = None, project_id: Optional[str] = None, ) -> Dict[str, Any]: """ Change some fields of a table. Use ``fields`` to specify which fields to update. At least one field must be provided. If a field is listed in ``fields`` and is ``None`` in ``table``, the field value will be deleted. If ``table.etag`` is not ``None``, the update will only succeed if the table on the server has the same ETag. Thus reading a table with ``get_table``, changing its fields, and then passing it to ``update_table`` will ensure that the changes will only be saved if no modifications to the table occurred since the read. :param project_id: The project to create the table into. :type project_id: str :param dataset_id: The dataset to create the table into. :type dataset_id: str :param table_id: The Name of the table to be created. :type table_id: str :param table_resource: Table resource as described in documentation: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table The table has to contain ``tableReference`` or ``project_id``, ``dataset_id`` and ``table_id`` have to be provided. :type table_resource: Dict[str, Any] :param fields: The fields of ``table`` to change, spelled as the Table properties (e.g. "friendly_name"). :type fields: List[str] """ fields = fields or list(table_resource.keys()) table_resource = self._resolve_table_reference( table_resource=table_resource, project_id=project_id, dataset_id=dataset_id, table_id=table_id ) table = Table.from_api_repr(table_resource) self.log.info('Updating table: %s', table_resource["tableReference"]) table_object = self.get_client(project_id=project_id).update_table(table=table, fields=fields) self.log.info('Table %s.%s.%s updated successfully', project_id, dataset_id, table_id) return table_object.to_api_repr()
@GoogleBaseHook.fallback_to_default_project_id
[docs] def patch_table( self, dataset_id: str, table_id: str, project_id: Optional[str] = None, description: Optional[str] = None, expiration_time: Optional[int] = None, external_data_configuration: Optional[Dict] = None, friendly_name: Optional[str] = None, labels: Optional[Dict] = None, schema: Optional[List] = None, time_partitioning: Optional[Dict] = None, view: Optional[Dict] = None, require_partition_filter: Optional[bool] = None, encryption_configuration: Optional[Dict] = None, ) -> None: """ Patch information in an existing table. It only updates fields that are provided in the request object. Reference: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/patch :param dataset_id: The dataset containing the table to be patched. :type dataset_id: str :param table_id: The Name of the table to be patched. :type table_id: str :param project_id: The project containing the table to be patched. :type project_id: str :param description: [Optional] A user-friendly description of this table. :type description: str :param expiration_time: [Optional] The time when this table expires, in milliseconds since the epoch. :type expiration_time: int :param external_data_configuration: [Optional] A dictionary containing properties of a table stored outside of BigQuery. :type external_data_configuration: dict :param friendly_name: [Optional] A descriptive name for this table. :type friendly_name: str :param labels: [Optional] A dictionary containing labels associated with this table. :type labels: dict :param schema: [Optional] If set, the schema field list as defined here: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema The supported schema modifications and unsupported schema modification are listed here: https://cloud.google.com/bigquery/docs/managing-table-schemas **Example**: :: schema=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}] :type schema: list :param time_partitioning: [Optional] A dictionary containing time-based partitioning definition for the table. :type time_partitioning: dict :param view: [Optional] A dictionary containing definition for the view. If set, it will patch a view instead of a table: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ViewDefinition **Example**: :: view = { "query": "SELECT * FROM `test-project-id.test_dataset_id.test_table_prefix*` LIMIT 500", "useLegacySql": False } :type view: dict :param require_partition_filter: [Optional] If true, queries over the this table require a partition filter. If false, queries over the table :type require_partition_filter: bool :param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys). **Example**: :: encryption_configuration = { "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key" } :type encryption_configuration: dict """ warnings.warn( "This method is deprecated, please use ``BigQueryHook.update_table`` method.", DeprecationWarning, ) table_resource: Dict[str, Any] = {} if description is not None: table_resource['description'] = description if expiration_time is not None: table_resource['expirationTime'] = expiration_time if external_data_configuration: table_resource['externalDataConfiguration'] = external_data_configuration if friendly_name is not None: table_resource['friendlyName'] = friendly_name if labels: table_resource['labels'] = labels if schema: table_resource['schema'] = {'fields': schema} if time_partitioning: table_resource['timePartitioning'] = time_partitioning if view: table_resource['view'] = view if require_partition_filter is not None: table_resource['requirePartitionFilter'] = require_partition_filter if encryption_configuration: table_resource["encryptionConfiguration"] = encryption_configuration self.update_table( table_resource=table_resource, fields=list(table_resource.keys()), project_id=project_id, dataset_id=dataset_id, table_id=table_id,
) @GoogleBaseHook.fallback_to_default_project_id
[docs] def insert_all( self, project_id: str, dataset_id: str, table_id: str, rows: List, ignore_unknown_values: bool = False, skip_invalid_rows: bool = False, fail_on_error: bool = False, ) -> None: """ Method to stream data into BigQuery one record at a time without needing to run a load job .. seealso:: For more information, see: https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll :param project_id: The name of the project where we have the table :type project_id: str :param dataset_id: The name of the dataset where we have the table :type dataset_id: str :param table_id: The name of the table :type table_id: str :param rows: the rows to insert :type rows: list **Example or rows**: rows=[{"json": {"a_key": "a_value_0"}}, {"json": {"a_key": "a_value_1"}}] :param ignore_unknown_values: [Optional] Accept rows that contain values that do not match the schema. The unknown values are ignored. The default value is false, which treats unknown values as errors. :type ignore_unknown_values: bool :param skip_invalid_rows: [Optional] Insert all valid rows of a request, even if invalid rows exist. The default value is false, which causes the entire request to fail if any invalid rows exist. :type skip_invalid_rows: bool :param fail_on_error: [Optional] Force the task to fail if any errors occur. The default value is false, which indicates the task should not fail even if any insertion errors occur. :type fail_on_error: bool """ self.log.info('Inserting %s row(s) into table %s:%s.%s', len(rows), project_id, dataset_id, table_id) table_ref = TableReference(dataset_ref=DatasetReference(project_id, dataset_id), table_id=table_id) bq_client = self.get_client(project_id=project_id) table = bq_client.get_table(table_ref) errors = bq_client.insert_rows( table=table, rows=rows, ignore_unknown_values=ignore_unknown_values, skip_invalid_rows=skip_invalid_rows, ) if errors: error_msg = f"{len(errors)} insert error(s) occurred. Details: {errors}" self.log.error(error_msg) if fail_on_error: raise AirflowException(f'BigQuery job failed. Error was: {error_msg}') else: self.log.info('All row(s) inserted successfully: %s:%s.%s', project_id, dataset_id, table_id)
@GoogleBaseHook.fallback_to_default_project_id
[docs] def update_dataset( self, fields: Sequence[str], dataset_resource: Dict[str, Any], dataset_id: Optional[str] = None, project_id: Optional[str] = None, retry: Retry = DEFAULT_RETRY, ) -> Dataset: """ Change some fields of a dataset. Use ``fields`` to specify which fields to update. At least one field must be provided. If a field is listed in ``fields`` and is ``None`` in ``dataset``, it will be deleted. If ``dataset.etag`` is not ``None``, the update will only succeed if the dataset on the server has the same ETag. Thus reading a dataset with ``get_dataset``, changing its fields, and then passing it to ``update_dataset`` will ensure that the changes will only be saved if no modifications to the dataset occurred since the read. :param dataset_resource: Dataset resource that will be provided in request body. https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource :type dataset_resource: dict :param dataset_id: The id of the dataset. :type dataset_id: str :param fields: The properties of ``dataset`` to change (e.g. "friendly_name"). :type fields: Sequence[str] :param project_id: The Google Cloud Project ID :type project_id: str :param retry: How to retry the RPC. :type retry: google.api_core.retry.Retry """ dataset_resource["datasetReference"] = dataset_resource.get("datasetReference", {}) for key, value in zip(["datasetId", "projectId"], [dataset_id, project_id]): spec_value = dataset_resource["datasetReference"].get(key) if value and not spec_value: dataset_resource["datasetReference"][key] = value self.log.info('Start updating dataset') dataset = self.get_client(project_id=project_id).update_dataset( dataset=Dataset.from_api_repr(dataset_resource), fields=fields, retry=retry, ) self.log.info("Dataset successfully updated: %s", dataset) return dataset
[docs] def patch_dataset( self, dataset_id: str, dataset_resource: Dict, project_id: Optional[str] = None ) -> Dict: """ Patches information in an existing dataset. It only replaces fields that are provided in the submitted dataset resource. More info: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/patch :param dataset_id: The BigQuery Dataset ID :type dataset_id: str :param dataset_resource: Dataset resource that will be provided in request body. https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource :type dataset_resource: dict :param project_id: The Google Cloud Project ID :type project_id: str :rtype: dataset https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource """ warnings.warn("This method is deprecated. Please use ``update_dataset``.", DeprecationWarning) project_id = project_id or self.project_id if not dataset_id or not isinstance(dataset_id, str): raise ValueError( "dataset_id argument must be provided and has " "a type 'str'. You provided: {}".format(dataset_id) ) service = self.get_service() dataset_project_id = project_id or self.project_id self.log.info('Start patching dataset: %s:%s', dataset_project_id, dataset_id) dataset = ( service.datasets() .patch( datasetId=dataset_id, projectId=dataset_project_id, body=dataset_resource, ) .execute(num_retries=self.num_retries) ) self.log.info("Dataset successfully patched: %s", dataset) return dataset
[docs] def get_dataset_tables_list( self, dataset_id: str, project_id: Optional[str] = None, table_prefix: Optional[str] = None, max_results: Optional[int] = None, ) -> List[Dict[str, Any]]: """ Method returns tables list of a BigQuery tables. If table prefix is specified, only tables beginning by it are returned. For more information, see: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list :param dataset_id: The BigQuery Dataset ID :type dataset_id: str :param project_id: The Google Cloud Project ID :type project_id: str :param table_prefix: Tables must begin by this prefix to be returned (case sensitive) :type table_prefix: str :param max_results: The maximum number of results to return in a single response page. Leverage the page tokens to iterate through the entire collection. :type max_results: int :return: List of tables associated with the dataset """ warnings.warn("This method is deprecated. Please use ``get_dataset_tables``.", DeprecationWarning) project_id = project_id or self.project_id tables = self.get_client().list_tables( dataset=DatasetReference(project=project_id, dataset_id=dataset_id), max_results=max_results, ) if table_prefix: result = [t.reference.to_api_repr() for t in tables if t.table_id.startswith(table_prefix)] else: result = [t.reference.to_api_repr() for t in tables] self.log.info("%s tables found", len(result)) return result
@GoogleBaseHook.fallback_to_default_project_id
[docs] def get_datasets_list( self, project_id: Optional[str] = None, include_all: bool = False, filter_: Optional[str] = None, max_results: Optional[int] = None, page_token: Optional[str] = None, retry: Retry = DEFAULT_RETRY, ) -> List[DatasetListItem]: """ Method returns full list of BigQuery datasets in the current project For more information, see: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list :param project_id: Google Cloud Project for which you try to get all datasets :type project_id: str :param include_all: True if results include hidden datasets. Defaults to False. :param filter_: An expression for filtering the results by label. For syntax, see https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list#filter. :param filter_: str :param max_results: Maximum number of datasets to return. :param max_results: int :param page_token: Token representing a cursor into the datasets. If not passed, the API will return the first page of datasets. The token marks the beginning of the iterator to be returned and the value of the ``page_token`` can be accessed at ``next_page_token`` of the :class:`~google.api_core.page_iterator.HTTPIterator`. :param page_token: str :param retry: How to retry the RPC. :type retry: google.api_core.retry.Retry """ datasets = self.get_client(project_id=project_id).list_datasets( project=project_id, include_all=include_all, filter=filter_, max_results=max_results, page_token=page_token, retry=retry, ) datasets_list = list(datasets) self.log.info("Datasets List: %s", len(datasets_list)) return datasets_list
@GoogleBaseHook.fallback_to_default_project_id
[docs] def get_dataset(self, dataset_id: str, project_id: Optional[str] = None) -> Dataset: """ Fetch the dataset referenced by dataset_id. :param dataset_id: The BigQuery Dataset ID :type dataset_id: str :param project_id: The Google Cloud Project ID :type project_id: str :return: dataset_resource .. seealso:: For more information, see Dataset Resource content: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource """ dataset = self.get_client(project_id=project_id).get_dataset( dataset_ref=DatasetReference(project_id, dataset_id) ) self.log.info("Dataset Resource: %s", dataset) return dataset
@GoogleBaseHook.fallback_to_default_project_id
[docs] def run_grant_dataset_view_access( self, source_dataset: str, view_dataset: str, view_table: str, source_project: Optional[str] = None, view_project: Optional[str] = None, project_id: Optional[str] = None, ) -> Dict[str, Any]: """ Grant authorized view access of a dataset to a view table. If this view has already been granted access to the dataset, do nothing. This method is not atomic. Running it may clobber a simultaneous update. :param source_dataset: the source dataset :type source_dataset: str :param view_dataset: the dataset that the view is in :type view_dataset: str :param view_table: the table of the view :type view_table: str :param project_id: the project of the source dataset. If None, self.project_id will be used. :type project_id: str :param view_project: the project that the view is in. If None, self.project_id will be used. :type view_project: str :return: the datasets resource of the source dataset. """ if source_project: project_id = source_project warnings.warn( "Parameter ``source_project`` is deprecated. Use ``project_id``.", DeprecationWarning, ) view_project = view_project or project_id view_access = AccessEntry( role=None, entity_type="view", entity_id={'projectId': view_project, 'datasetId': view_dataset, 'tableId': view_table}, ) dataset = self.get_dataset(project_id=project_id, dataset_id=source_dataset) # Check to see if the view we want to add already exists. if view_access not in dataset.access_entries: self.log.info( 'Granting table %s:%s.%s authorized view access to %s:%s dataset.', view_project, view_dataset, view_table, project_id, source_dataset, ) dataset.access_entries += [view_access] dataset = self.update_dataset( fields=["access"], dataset_resource=dataset.to_api_repr(), project_id=project_id ) else: self.log.info( 'Table %s:%s.%s already has authorized view access to %s:%s dataset.', view_project, view_dataset, view_table, project_id, source_dataset, ) return dataset.to_api_repr()
@GoogleBaseHook.fallback_to_default_project_id
[docs] def run_table_upsert( self, dataset_id: str, table_resource: Dict[str, Any], project_id: Optional[str] = None ) -> Dict[str, Any]: """ If the table already exists, update the existing table if not create new. Since BigQuery does not natively allow table upserts, this is not an atomic operation. :param dataset_id: the dataset to upsert the table into. :type dataset_id: str :param table_resource: a table resource. see https://cloud.google.com/bigquery/docs/reference/v2/tables#resource :type table_resource: dict :param project_id: the project to upsert the table into. If None, project will be self.project_id. :return: """ table_id = table_resource['tableReference']['tableId'] table_resource = self._resolve_table_reference( table_resource=table_resource, project_id=project_id, dataset_id=dataset_id, table_id=table_id ) tables_list_resp = self.get_dataset_tables(dataset_id=dataset_id, project_id=project_id) if any(table['tableId'] == table_id for table in tables_list_resp): self.log.info('Table %s:%s.%s exists, updating.', project_id, dataset_id, table_id) table = self.update_table(table_resource=table_resource) else: self.log.info('Table %s:%s.%s does not exist. creating.', project_id, dataset_id, table_id) table = self.create_empty_table( table_resource=table_resource, project_id=project_id ).to_api_repr() return table
[docs] def run_table_delete(self, deletion_dataset_table: str, ignore_if_missing: bool = False) -> None: """ Delete an existing table from the dataset; If the table does not exist, return an error unless ignore_if_missing is set to True. :param deletion_dataset_table: A dotted ``(<project>.|<project>:)<dataset>.<table>`` that indicates which table will be deleted. :type deletion_dataset_table: str :param ignore_if_missing: if True, then return success even if the requested table does not exist. :type ignore_if_missing: bool :return: """ warnings.warn("This method is deprecated. Please use `delete_table`.", DeprecationWarning) return self.delete_table(table_id=deletion_dataset_table, not_found_ok=ignore_if_missing)
@GoogleBaseHook.fallback_to_default_project_id
[docs] def delete_table( self, table_id: str, not_found_ok: bool = True, project_id: Optional[str] = None, ) -> None: """ Delete an existing table from the dataset. If the table does not exist, return an error unless not_found_ok is set to True. :param table_id: A dotted ``(<project>.|<project>:)<dataset>.<table>`` that indicates which table will be deleted. :type table_id: str :param not_found_ok: if True, then return success even if the requested table does not exist. :type not_found_ok: bool :param project_id: the project used to perform the request :type project_id: str """ self.get_client(project_id=project_id).delete_table( table=Table.from_string(table_id), not_found_ok=not_found_ok, ) self.log.info('Deleted table %s', table_id)
[docs] def get_tabledata( self, dataset_id: str, table_id: str, max_results: Optional[int] = None, selected_fields: Optional[str] = None, page_token: Optional[str] = None, start_index: Optional[int] = None, ) -> List[Dict]: """ Get the data of a given dataset.table and optionally with selected columns. see https://cloud.google.com/bigquery/docs/reference/v2/tabledata/list :param dataset_id: the dataset ID of the requested table. :param table_id: the table ID of the requested table. :param max_results: the maximum results to return. :param selected_fields: List of fields to return (comma-separated). If unspecified, all fields are returned. :param page_token: page token, returned from a previous call, identifying the result set. :param start_index: zero based index of the starting row to read. :return: list of rows """ warnings.warn("This method is deprecated. Please use `list_rows`.", DeprecationWarning) rows = self.list_rows(dataset_id, table_id, max_results, selected_fields, page_token, start_index) return [dict(r) for r in rows]
@GoogleBaseHook.fallback_to_default_project_id
[docs] def list_rows( self, dataset_id: str, table_id: str, max_results: Optional[int] = None, selected_fields: Optional[Union[List[str], str]] = None, page_token: Optional[str] = None, start_index: Optional[int] = None, project_id: Optional[str] = None, location: Optional[str] = None, ) -> List[Row]: """ List the rows of the table. See https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list :param dataset_id: the dataset ID of the requested table. :param table_id: the table ID of the requested table. :param max_results: the maximum results to return. :param selected_fields: List of fields to return (comma-separated). If unspecified, all fields are returned. :param page_token: page token, returned from a previous call, identifying the result set. :param start_index: zero based index of the starting row to read. :param project_id: Project ID for the project which the client acts on behalf of. :param location: Default location for job. :return: list of rows """ location = location or self.location if isinstance(selected_fields, str): selected_fields = selected_fields.split(",") if selected_fields: selected_fields = [SchemaField(n, "") for n in selected_fields] else: selected_fields = None table = self._resolve_table_reference( table_resource={}, project_id=project_id, dataset_id=dataset_id, table_id=table_id, ) result = self.get_client(project_id=project_id, location=location).list_rows( table=Table.from_api_repr(table), selected_fields=selected_fields, max_results=max_results, page_token=page_token, start_index=start_index, ) return list(result)
@GoogleBaseHook.fallback_to_default_project_id
[docs] def get_schema(self, dataset_id: str, table_id: str, project_id: Optional[str] = None) -> dict: """ Get the schema for a given dataset and table. see https://cloud.google.com/bigquery/docs/reference/v2/tables#resource :param dataset_id: the dataset ID of the requested table :param table_id: the table ID of the requested table :param project_id: the optional project ID of the requested table. If not provided, the connector's configured project will be used. :return: a table schema """ table_ref = TableReference(dataset_ref=DatasetReference(project_id, dataset_id), table_id=table_id) table = self.get_client(project_id=project_id).get_table(table_ref) return {"fields": [s.to_api_repr() for s in table.schema]}
@GoogleBaseHook.fallback_to_default_project_id
[docs] def update_table_schema( self, schema_fields_updates: List[Dict[str, Any]], include_policy_tags: bool, dataset_id: str, table_id: str, project_id: Optional[str] = None, ) -> None: """ Update fields within a schema for a given dataset and table. Note that some fields in schemas are immutable and trying to change them will cause an exception. If a new field is included it will be inserted which requires all required fields to be set. See https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema :param include_policy_tags: If set to True policy tags will be included in the update request which requires special permissions even if unchanged see https://cloud.google.com/bigquery/docs/column-level-security#roles :type include_policy_tags: bool :param dataset_id: the dataset ID of the requested table to be updated :type dataset_id: str :param table_id: the table ID of the table to be updated :type table_id: str :param schema_fields_updates: a partial schema resource. see https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema **Example**: :: schema_fields_updates=[ {"name": "emp_name", "description": "Some New Description"}, {"name": "salary", "description": "Some New Description"}, {"name": "departments", "fields": [ {"name": "name", "description": "Some New Description"}, {"name": "type", "description": "Some New Description"} ]}, ] :type schema_fields_updates: List[dict] :param project_id: The name of the project where we want to update the table. :type project_id: str """ def _build_new_schema( current_schema: List[Dict[str, Any]], schema_fields_updates: List[Dict[str, Any]] ) -> List[Dict[str, Any]]: # Turn schema_field_updates into a dict keyed on field names schema_fields_updates = {field["name"]: field for field in deepcopy(schema_fields_updates)} # Create a new dict for storing the new schema, initated based on the current_schema # as of Python 3.6, dicts retain order. new_schema = {field["name"]: field for field in deepcopy(current_schema)} # Each item in schema_fields_updates contains a potential patch # to a schema field, iterate over them for field_name, patched_value in schema_fields_updates.items(): # If this field already exists, update it if field_name in new_schema: # If this field is of type RECORD and has a fields key we need to patch it recursively if "fields" in patched_value: patched_value["fields"] = _build_new_schema( new_schema[field_name]["fields"], patched_value["fields"] ) # Update the new_schema with the patched value new_schema[field_name].update(patched_value) # This is a new field, just include the whole configuration for it else: new_schema[field_name] = patched_value return list(new_schema.values()) def _remove_policy_tags(schema: List[Dict[str, Any]]): for field in schema: if "policyTags" in field: del field["policyTags"] if "fields" in field: _remove_policy_tags(field["fields"]) current_table_schema = self.get_schema( dataset_id=dataset_id, table_id=table_id, project_id=project_id )["fields"] new_schema = _build_new_schema(current_table_schema, schema_fields_updates) if not include_policy_tags: _remove_policy_tags(new_schema) self.update_table( table_resource={"schema": {"fields": new_schema}}, fields=["schema"], project_id=project_id, dataset_id=dataset_id, table_id=table_id,
) @GoogleBaseHook.fallback_to_default_project_id
[docs] def poll_job_complete( self, job_id: str, project_id: Optional[str] = None, location: Optional[str] = None, retry: Retry = DEFAULT_RETRY, ) -> bool: """ Check if jobs completed. :param job_id: id of the job. :type job_id: str :param project_id: Google Cloud Project where the job is running :type project_id: str :param location: location the job is running :type location: str :param retry: How to retry the RPC. :type retry: google.api_core.retry.Retry :rtype: bool """ location = location or self.location job = self.get_client(project_id=project_id, location=location).get_job(job_id=job_id) return job.done(retry=retry)
[docs] def cancel_query(self) -> None: """Cancel all started queries that have not yet completed""" warnings.warn( "This method is deprecated. Please use `BigQueryHook.cancel_job`.", DeprecationWarning, ) if self.running_job_id: self.cancel_job(job_id=self.running_job_id) else: self.log.info('No running BigQuery jobs to cancel.')
@GoogleBaseHook.fallback_to_default_project_id
[docs] def cancel_job( self, job_id: str, project_id: Optional[str] = None, location: Optional[str] = None, ) -> None: """ Cancels a job an wait for cancellation to complete :param job_id: id of the job. :type job_id: str :param project_id: Google Cloud Project where the job is running :type project_id: str :param location: location the job is running :type location: str """ location = location or self.location if self.poll_job_complete(job_id=job_id): self.log.info('No running BigQuery jobs to cancel.') return self.log.info('Attempting to cancel job : %s, %s', project_id, job_id) self.get_client(location=location, project_id=project_id).cancel_job(job_id=job_id) # Wait for all the calls to cancel to finish max_polling_attempts = 12 polling_attempts = 0 job_complete = False while polling_attempts < max_polling_attempts and not job_complete: polling_attempts += 1 job_complete = self.poll_job_complete(job_id) if job_complete: self.log.info('Job successfully canceled: %s, %s', project_id, job_id) elif polling_attempts == max_polling_attempts: self.log.info( "Stopping polling due to timeout. Job with id %s " "has not completed cancel and may or may not finish.", job_id, ) else: self.log.info('Waiting for canceled job with id %s to finish.', job_id) time.sleep(5)
@GoogleBaseHook.fallback_to_default_project_id
[docs] def get_job( self, job_id: Optional[str] = None, project_id: Optional[str] = None, location: Optional[str] = None, ) -> Union[CopyJob, QueryJob, LoadJob, ExtractJob]: """ Retrieves a BigQuery job. For more information see: https://cloud.google.com/bigquery/docs/reference/v2/jobs :param job_id: The ID of the job. The ID must contain only letters (a-z, A-Z), numbers (0-9), underscores (_), or dashes (-). The maximum length is 1,024 characters. If not provided then uuid will be generated. :type job_id: str :param project_id: Google Cloud Project where the job is running :type project_id: str :param location: location the job is running :type location: str """ client = self.get_client(project_id=project_id, location=location) job = client.get_job(job_id=job_id, project=project_id, location=location) return job
@staticmethod def _custom_job_id(configuration: Dict[str, Any]) -> str: hash_base = json.dumps(configuration, sort_keys=True) uniqueness_suffix = hashlib.md5(hash_base.encode()).hexdigest() microseconds_from_epoch = int( (datetime.now() - datetime.fromtimestamp(0)) / timedelta(microseconds=1) ) return f"airflow_{microseconds_from_epoch}_{uniqueness_suffix}" @GoogleBaseHook.fallback_to_default_project_id
[docs] def insert_job( self, configuration: Dict, job_id: Optional[str] = None, project_id: Optional[str] = None, location: Optional[str] = None, ) -> BigQueryJob: """ Executes a BigQuery job. Waits for the job to complete and returns job id. See here: https://cloud.google.com/bigquery/docs/reference/v2/jobs :param configuration: The configuration parameter maps directly to BigQuery's configuration field in the job object. See https://cloud.google.com/bigquery/docs/reference/v2/jobs for details. :type configuration: Dict[str, Any] :param job_id: The ID of the job. The ID must contain only letters (a-z, A-Z), numbers (0-9), underscores (_), or dashes (-). The maximum length is 1,024 characters. If not provided then uuid will be generated. :type job_id: str :param project_id: Google Cloud Project where the job is running :type project_id: str :param location: location the job is running :type location: str """ location = location or self.location job_id = job_id or self._custom_job_id(configuration) client = self.get_client(project_id=project_id, location=location) job_data = { "configuration": configuration, "jobReference": {"jobId": job_id, "projectId": project_id, "location": location}, } supported_jobs = { LoadJob._JOB_TYPE: LoadJob, CopyJob._JOB_TYPE: CopyJob, ExtractJob._JOB_TYPE: ExtractJob, QueryJob._JOB_TYPE: QueryJob, } job = None for job_type, job_object in supported_jobs.items(): if job_type in configuration: job = job_object break if not job: raise AirflowException(f"Unknown job type. Supported types: {supported_jobs.keys()}") job = job.from_api_repr(job_data, client) self.log.info("Inserting job %s", job.job_id) # Start the job and wait for it to complete and get the result. job.result() return job
[docs] def run_with_configuration(self, configuration: dict) -> str: """ Executes a BigQuery SQL query. See here: https://cloud.google.com/bigquery/docs/reference/v2/jobs For more details about the configuration parameter. :param configuration: The configuration parameter maps directly to BigQuery's configuration field in the job object. See https://cloud.google.com/bigquery/docs/reference/v2/jobs for details. """ warnings.warn("This method is deprecated. Please use `BigQueryHook.insert_job`", DeprecationWarning) job = self.insert_job(configuration=configuration, project_id=self.project_id) self.running_job_id = job.job_id return job.job_id
[docs] def run_load( self, destination_project_dataset_table: str, source_uris: List, schema_fields: Optional[List] = None, source_format: str = 'CSV', create_disposition: str = 'CREATE_IF_NEEDED', skip_leading_rows: int = 0, write_disposition: str = 'WRITE_EMPTY', field_delimiter: str = ',', max_bad_records: int = 0, quote_character: Optional[str] = None, ignore_unknown_values: bool = False, allow_quoted_newlines: bool = False, allow_jagged_rows: bool = False, encoding: str = "UTF-8", schema_update_options: Optional[Iterable] = None, src_fmt_configs: Optional[Dict] = None, time_partitioning: Optional[Dict] = None, cluster_fields: Optional[List] = None, autodetect: bool = False, encryption_configuration: Optional[Dict] = None, labels: Optional[Dict] = None, description: Optional[str] = None, ) -> str: """ Executes a BigQuery load command to load data from Google Cloud Storage to BigQuery. See here: https://cloud.google.com/bigquery/docs/reference/v2/jobs For more details about these parameters. :param destination_project_dataset_table: The dotted ``(<project>.|<project>:)<dataset>.<table>($<partition>)`` BigQuery table to load data into. If ``<project>`` is not included, project will be the project defined in the connection json. If a partition is specified the operator will automatically append the data, create a new partition or create a new DAY partitioned table. :type destination_project_dataset_table: str :param schema_fields: The schema field list as defined here: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load Required if autodetect=False; optional if autodetect=True. :type schema_fields: list :param autodetect: Attempt to autodetect the schema for CSV and JSON source files. :type autodetect: bool :param source_uris: The source Google Cloud Storage URI (e.g. gs://some-bucket/some-file.txt). A single wild per-object name can be used. :type source_uris: list :param source_format: File format to export. :type source_format: str :param create_disposition: The create disposition if the table doesn't exist. :type create_disposition: str :param skip_leading_rows: Number of rows to skip when loading from a CSV. :type skip_leading_rows: int :param write_disposition: The write disposition if the table already exists. :type write_disposition: str :param field_delimiter: The delimiter to use when loading from a CSV. :type field_delimiter: str :param max_bad_records: The maximum number of bad records that BigQuery can ignore when running the job. :type max_bad_records: int :param quote_character: The value that is used to quote data sections in a CSV file. :type quote_character: str :param ignore_unknown_values: [Optional] Indicates if BigQuery should allow extra values that are not represented in the table schema. If true, the extra values are ignored. If false, records with extra columns are treated as bad records, and if there are too many bad records, an invalid error is returned in the job result. :type ignore_unknown_values: bool :param allow_quoted_newlines: Whether to allow quoted newlines (true) or not (false). :type allow_quoted_newlines: bool :param allow_jagged_rows: Accept rows that are missing trailing optional columns. The missing values are treated as nulls. If false, records with missing trailing columns are treated as bad records, and if there are too many bad records, an invalid error is returned in the job result. Only applicable when source_format is CSV. :type allow_jagged_rows: bool :param encoding: The character encoding of the data. .. seealso:: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.csvOptions.encoding :type encoding: str :param schema_update_options: Allows the schema of the destination table to be updated as a side effect of the load job. :type schema_update_options: Union[list, tuple, set] :param src_fmt_configs: configure optional fields specific to the source format :type src_fmt_configs: dict :param time_partitioning: configure optional time partitioning fields i.e. partition by field, type and expiration as per API specifications. :type time_partitioning: dict :param cluster_fields: Request that the result of this load be stored sorted by one or more columns. BigQuery supports clustering for both partitioned and non-partitioned tables. The order of columns given determines the sort order. :type cluster_fields: list[str] :param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys). **Example**: :: encryption_configuration = { "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key" } :type encryption_configuration: dict :param labels: A dictionary containing labels for the BiqQuery table. :type labels: dict :param description: A string containing the description for the BigQuery table. :type descriptin: str """ warnings.warn( "This method is deprecated. Please use `BigQueryHook.insert_job` method.", DeprecationWarning ) if not self.project_id: raise ValueError("The project_id should be set") # To provide backward compatibility schema_update_options = list(schema_update_options or []) # bigquery only allows certain source formats # we check to make sure the passed source format is valid # if it's not, we raise a ValueError # Refer to this link for more details: # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceFormat # noqa if schema_fields is None and not autodetect: raise ValueError('You must either pass a schema or autodetect=True.') if src_fmt_configs is None: src_fmt_configs = {} source_format = source_format.upper() allowed_formats = [ "CSV", "NEWLINE_DELIMITED_JSON", "AVRO", "GOOGLE_SHEETS", "DATASTORE_BACKUP", "PARQUET", ] if source_format not in allowed_formats: raise ValueError( "{} is not a valid source format. " "Please use one of the following types: {}".format(source_format, allowed_formats) ) # bigquery also allows you to define how you want a table's schema to change # as a side effect of a load # for more details: # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schemaUpdateOptions allowed_schema_update_options = ['ALLOW_FIELD_ADDITION', "ALLOW_FIELD_RELAXATION"] if not set(allowed_schema_update_options).issuperset(set(schema_update_options)): raise ValueError( "{} contains invalid schema update options." "Please only use one or more of the following options: {}".format( schema_update_options, allowed_schema_update_options ) ) destination_project, destination_dataset, destination_table = _split_tablename( table_input=destination_project_dataset_table, default_project_id=self.project_id, var_name='destination_project_dataset_table', ) configuration = { 'load': { 'autodetect': autodetect, 'createDisposition': create_disposition, 'destinationTable': { 'projectId': destination_project, 'datasetId': destination_dataset, 'tableId': destination_table, }, 'sourceFormat': source_format, 'sourceUris': source_uris, 'writeDisposition': write_disposition, 'ignoreUnknownValues': ignore_unknown_values, } } time_partitioning = _cleanse_time_partitioning(destination_project_dataset_table, time_partitioning) if time_partitioning: configuration['load'].update({'timePartitioning': time_partitioning}) if cluster_fields: configuration['load'].update({'clustering': {'fields': cluster_fields}}) if schema_fields: configuration['load']['schema'] = {'fields': schema_fields} if schema_update_options: if write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]: raise ValueError( "schema_update_options is only " "allowed if write_disposition is " "'WRITE_APPEND' or 'WRITE_TRUNCATE'." ) else: self.log.info("Adding experimental 'schemaUpdateOptions': %s", schema_update_options) configuration['load']['schemaUpdateOptions'] = schema_update_options if max_bad_records: configuration['load']['maxBadRecords'] = max_bad_records if encryption_configuration: configuration["load"]["destinationEncryptionConfiguration"] = encryption_configuration if labels or description: configuration['load'].update({'destinationTableProperties': {}}) if labels: configuration['load']['destinationTableProperties']['labels'] = labels if description: configuration['load']['destinationTableProperties']['description'] = description src_fmt_to_configs_mapping = { 'CSV': [ 'allowJaggedRows', 'allowQuotedNewlines', 'autodetect', 'fieldDelimiter', 'skipLeadingRows', 'ignoreUnknownValues', 'nullMarker', 'quote', 'encoding', ], 'DATASTORE_BACKUP': ['projectionFields'], 'NEWLINE_DELIMITED_JSON': ['autodetect', 'ignoreUnknownValues'], 'PARQUET': ['autodetect', 'ignoreUnknownValues'], 'AVRO': ['useAvroLogicalTypes'], } valid_configs = src_fmt_to_configs_mapping[source_format] # if following fields are not specified in src_fmt_configs, # honor the top-level params for backward-compatibility backward_compatibility_configs = { 'skipLeadingRows': skip_leading_rows, 'fieldDelimiter': field_delimiter, 'ignoreUnknownValues': ignore_unknown_values, 'quote': quote_character, 'allowQuotedNewlines': allow_quoted_newlines, 'encoding': encoding, } src_fmt_configs = _validate_src_fmt_configs( source_format, src_fmt_configs, valid_configs, backward_compatibility_configs ) configuration['load'].update(src_fmt_configs) if allow_jagged_rows: configuration['load']['allowJaggedRows'] = allow_jagged_rows job = self.insert_job(configuration=configuration, project_id=self.project_id) self.running_job_id = job.job_id return job.job_id
[docs] def run_copy( self, source_project_dataset_tables: Union[List, str], destination_project_dataset_table: str, write_disposition: str = 'WRITE_EMPTY', create_disposition: str = 'CREATE_IF_NEEDED', labels: Optional[Dict] = None, encryption_configuration: Optional[Dict] = None, ) -> str: """ Executes a BigQuery copy command to copy data from one BigQuery table to another. See here: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy For more details about these parameters. :param source_project_dataset_tables: One or more dotted ``(project:|project.)<dataset>.<table>`` BigQuery tables to use as the source data. Use a list if there are multiple source tables. If ``<project>`` is not included, project will be the project defined in the connection json. :type source_project_dataset_tables: list|string :param destination_project_dataset_table: The destination BigQuery table. Format is: ``(project:|project.)<dataset>.<table>`` :type destination_project_dataset_table: str :param write_disposition: The write disposition if the table already exists. :type write_disposition: str :param create_disposition: The create disposition if the table doesn't exist. :type create_disposition: str :param labels: a dictionary containing labels for the job/query, passed to BigQuery :type labels: dict :param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys). **Example**: :: encryption_configuration = { "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key" } :type encryption_configuration: dict """ warnings.warn( "This method is deprecated. Please use `BigQueryHook.insert_job` method.", DeprecationWarning ) if not self.project_id: raise ValueError("The project_id should be set") source_project_dataset_tables = ( [source_project_dataset_tables] if not isinstance(source_project_dataset_tables, list) else source_project_dataset_tables ) source_project_dataset_tables_fixup = [] for source_project_dataset_table in source_project_dataset_tables: source_project, source_dataset, source_table = _split_tablename( table_input=source_project_dataset_table, default_project_id=self.project_id, var_name='source_project_dataset_table', ) source_project_dataset_tables_fixup.append( {'projectId': source_project, 'datasetId': source_dataset, 'tableId': source_table} ) destination_project, destination_dataset, destination_table = _split_tablename( table_input=destination_project_dataset_table, default_project_id=self.project_id ) configuration = { 'copy': { 'createDisposition': create_disposition, 'writeDisposition': write_disposition, 'sourceTables': source_project_dataset_tables_fixup, 'destinationTable': { 'projectId': destination_project, 'datasetId': destination_dataset, 'tableId': destination_table, }, } } if labels: configuration['labels'] = labels if encryption_configuration: configuration["copy"]["destinationEncryptionConfiguration"] = encryption_configuration job = self.insert_job(configuration=configuration, project_id=self.project_id) self.running_job_id = job.job_id return job.job_id
[docs] def run_extract( self, source_project_dataset_table: str, destination_cloud_storage_uris: str, compression: str = 'NONE', export_format: str = 'CSV', field_delimiter: str = ',', print_header: bool = True, labels: Optional[Dict] = None, ) -> str: """ Executes a BigQuery extract command to copy data from BigQuery to Google Cloud Storage. See here: https://cloud.google.com/bigquery/docs/reference/v2/jobs For more details about these parameters. :param source_project_dataset_table: The dotted ``<dataset>.<table>`` BigQuery table to use as the source data. :type source_project_dataset_table: str :param destination_cloud_storage_uris: The destination Google Cloud Storage URI (e.g. gs://some-bucket/some-file.txt). Follows convention defined here: https://cloud.google.com/bigquery/exporting-data-from-bigquery#exportingmultiple :type destination_cloud_storage_uris: list :param compression: Type of compression to use. :type compression: str :param export_format: File format to export. :type export_format: str :param field_delimiter: The delimiter to use when extracting to a CSV. :type field_delimiter: str :param print_header: Whether to print a header for a CSV file extract. :type print_header: bool :param labels: a dictionary containing labels for the job/query, passed to BigQuery :type labels: dict """ warnings.warn( "This method is deprecated. Please use `BigQueryHook.insert_job` method.", DeprecationWarning ) if not self.project_id: raise ValueError("The project_id should be set") source_project, source_dataset, source_table = _split_tablename( table_input=source_project_dataset_table, default_project_id=self.project_id, var_name='source_project_dataset_table', ) configuration = { 'extract': { 'sourceTable': { 'projectId': source_project, 'datasetId': source_dataset, 'tableId': source_table, }, 'compression': compression, 'destinationUris': destination_cloud_storage_uris, 'destinationFormat': export_format, } } # type: Dict[str, Any] if labels: configuration['labels'] = labels if export_format == 'CSV': # Only set fieldDelimiter and printHeader fields if using CSV. # Google does not like it if you set these fields for other export # formats. configuration['extract']['fieldDelimiter'] = field_delimiter configuration['extract']['printHeader'] = print_header job = self.insert_job(configuration=configuration, project_id=self.project_id) self.running_job_id = job.job_id return job.job_id
[docs] def run_query( self, sql: str, destination_dataset_table: Optional[str] = None, write_disposition: str = 'WRITE_EMPTY', allow_large_results: bool = False, flatten_results: Optional[bool] = None, udf_config: Optional[List] = None, use_legacy_sql: Optional[bool] = None, maximum_billing_tier: Optional[int] = None, maximum_bytes_billed: Optional[float] = None, create_disposition: str = 'CREATE_IF_NEEDED', query_params: Optional[List] = None, labels: Optional[Dict] = None, schema_update_options: Optional[Iterable] = None, priority: str = 'INTERACTIVE', time_partitioning: Optional[Dict] = None, api_resource_configs: Optional[Dict] = None, cluster_fields: Optional[List[str]] = None, location: Optional[str] = None, encryption_configuration: Optional[Dict] = None, ) -> str: """ Executes a BigQuery SQL query. Optionally persists results in a BigQuery table. See here: https://cloud.google.com/bigquery/docs/reference/v2/jobs For more details about these parameters. :param sql: The BigQuery SQL to execute. :type sql: str :param destination_dataset_table: The dotted ``<dataset>.<table>`` BigQuery table to save the query results. :type destination_dataset_table: str :param write_disposition: What to do if the table already exists in BigQuery. :type write_disposition: str :param allow_large_results: Whether to allow large results. :type allow_large_results: bool :param flatten_results: If true and query uses legacy SQL dialect, flattens all nested and repeated fields in the query results. ``allowLargeResults`` must be true if this is set to false. For standard SQL queries, this flag is ignored and results are never flattened. :type flatten_results: bool :param udf_config: The User Defined Function configuration for the query. See https://cloud.google.com/bigquery/user-defined-functions for details. :type udf_config: list :param use_legacy_sql: Whether to use legacy SQL (true) or standard SQL (false). If `None`, defaults to `self.use_legacy_sql`. :type use_legacy_sql: bool :param api_resource_configs: a dictionary that contain params 'configuration' applied for Google BigQuery Jobs API: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs for example, {'query': {'useQueryCache': False}}. You could use it if you need to provide some params that are not supported by the BigQueryHook like args. :type api_resource_configs: dict :param maximum_billing_tier: Positive integer that serves as a multiplier of the basic price. :type maximum_billing_tier: int :param maximum_bytes_billed: Limits the bytes billed for this job. Queries that will have bytes billed beyond this limit will fail (without incurring a charge). If unspecified, this will be set to your project default. :type maximum_bytes_billed: float :param create_disposition: Specifies whether the job is allowed to create new tables. :type create_disposition: str :param query_params: a list of dictionary containing query parameter types and values, passed to BigQuery :type query_params: list :param labels: a dictionary containing labels for the job/query, passed to BigQuery :type labels: dict :param schema_update_options: Allows the schema of the destination table to be updated as a side effect of the query job. :type schema_update_options: Union[list, tuple, set] :param priority: Specifies a priority for the query. Possible values include INTERACTIVE and BATCH. The default value is INTERACTIVE. :type priority: str :param time_partitioning: configure optional time partitioning fields i.e. partition by field, type and expiration as per API specifications. :type time_partitioning: dict :param cluster_fields: Request that the result of this query be stored sorted by one or more columns. BigQuery supports clustering for both partitioned and non-partitioned tables. The order of columns given determines the sort order. :type cluster_fields: list[str] :param location: The geographic location of the job. Required except for US and EU. See details at https://cloud.google.com/bigquery/docs/locations#specifying_your_location :type location: str :param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys). **Example**: :: encryption_configuration = { "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key" } :type encryption_configuration: dict """ warnings.warn( "This method is deprecated. Please use `BigQueryHook.insert_job` method.", DeprecationWarning ) if not self.project_id: raise ValueError("The project_id should be set") labels = labels or self.labels schema_update_options = list(schema_update_options or []) if time_partitioning is None: time_partitioning = {} if location: self.location = location if not api_resource_configs: api_resource_configs = self.api_resource_configs else: _validate_value('api_resource_configs', api_resource_configs, dict) configuration = deepcopy(api_resource_configs) if 'query' not in configuration: configuration['query'] = {} else: _validate_value("api_resource_configs['query']", configuration['query'], dict) if sql is None and not configuration['query'].get('query', None): raise TypeError('`BigQueryBaseCursor.run_query` missing 1 required positional argument: `sql`') # BigQuery also allows you to define how you want a table's schema to change # as a side effect of a query job # for more details: # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.schemaUpdateOptions # noqa allowed_schema_update_options = ['ALLOW_FIELD_ADDITION', "ALLOW_FIELD_RELAXATION"] if not set(allowed_schema_update_options).issuperset(set(schema_update_options)): raise ValueError( "{} contains invalid schema update options. " "Please only use one or more of the following " "options: {}".format(schema_update_options, allowed_schema_update_options) ) if schema_update_options: if write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]: raise ValueError( "schema_update_options is only " "allowed if write_disposition is " "'WRITE_APPEND' or 'WRITE_TRUNCATE'." ) if destination_dataset_table: destination_project, destination_dataset, destination_table = _split_tablename( table_input=destination_dataset_table, default_project_id=self.project_id ) destination_dataset_table = { # type: ignore 'projectId': destination_project, 'datasetId': destination_dataset, 'tableId': destination_table, } if cluster_fields: cluster_fields = {'fields': cluster_fields} # type: ignore query_param_list = [ (sql, 'query', None, (str,)), (priority, 'priority', 'INTERACTIVE', (str,)), (use_legacy_sql, 'useLegacySql', self.use_legacy_sql, bool), (query_params, 'queryParameters', None, list), (udf_config, 'userDefinedFunctionResources', None, list), (maximum_billing_tier, 'maximumBillingTier', None, int), (maximum_bytes_billed, 'maximumBytesBilled', None, float), (time_partitioning, 'timePartitioning', {}, dict), (schema_update_options, 'schemaUpdateOptions', None, list), (destination_dataset_table, 'destinationTable', None, dict), (cluster_fields, 'clustering', None, dict), ] # type: List[Tuple] for param, param_name, param_default, param_type in query_param_list: if param_name not in configuration['query'] and param in [None, {}, ()]: if param_name == 'timePartitioning': param_default = _cleanse_time_partitioning(destination_dataset_table, time_partitioning) param = param_default if param in [None, {}, ()]: continue _api_resource_configs_duplication_check(param_name, param, configuration['query']) configuration['query'][param_name] = param # check valid type of provided param, # it last step because we can get param from 2 sources, # and first of all need to find it _validate_value(param_name, configuration['query'][param_name], param_type) if param_name == 'schemaUpdateOptions' and param: self.log.info("Adding experimental 'schemaUpdateOptions': %s", schema_update_options) if param_name != 'destinationTable': continue for key in ['projectId', 'datasetId', 'tableId']: if key not in configuration['query']['destinationTable']: raise ValueError( "Not correct 'destinationTable' in " "api_resource_configs. 'destinationTable' " "must be a dict with {'projectId':'', " "'datasetId':'', 'tableId':''}" ) configuration['query'].update( { 'allowLargeResults': allow_large_results, 'flattenResults': flatten_results, 'writeDisposition': write_disposition, 'createDisposition': create_disposition, } ) if ( 'useLegacySql' in configuration['query'] and configuration['query']['useLegacySql'] and 'queryParameters' in configuration['query'] ): raise ValueError("Query parameters are not allowed when using legacy SQL") if labels: _api_resource_configs_duplication_check('labels', labels, configuration) configuration['labels'] = labels if encryption_configuration: configuration["query"]["destinationEncryptionConfiguration"] = encryption_configuration job = self.insert_job(configuration=configuration, project_id=self.project_id) self.running_job_id = job.job_id return job.job_id
[docs]class BigQueryPandasConnector(GbqConnector): """ This connector behaves identically to GbqConnector (from Pandas), except that it allows the service to be injected, and disables a call to self.get_credentials(). This allows Airflow to use BigQuery with Pandas without forcing a three legged OAuth connection. Instead, we can inject service account credentials into the binding. """ def __init__( self, project_id: str, service: str, reauth: bool = False, verbose: bool = False, dialect="legacy" ) -> None: super().__init__(project_id) gbq_check_google_client_version() gbq_test_google_api_imports() self.project_id = project_id self.reauth = reauth self.service = service self.verbose = verbose self.dialect = dialect
[docs]class BigQueryConnection: """ BigQuery does not have a notion of a persistent connection. Thus, these objects are small stateless factories for cursors, which do all the real work. """ def __init__(self, *args, **kwargs) -> None: self._args = args self._kwargs = kwargs
[docs] def close(self) -> None:
"""The BigQueryConnection does not have anything to close"""
[docs] def commit(self) -> None:
"""The BigQueryConnection does not support transactions"""
[docs] def cursor(self) -> "BigQueryCursor": """Return a new :py:class:`Cursor` object using the connection""" return BigQueryCursor(*self._args, **self._kwargs)
[docs] def rollback(self) -> NoReturn: """The BigQueryConnection does not have transactions""" raise NotImplementedError("BigQueryConnection does not have transactions")
[docs]class BigQueryBaseCursor(LoggingMixin): """ The BigQuery base cursor contains helper methods to execute queries against BigQuery. The methods can be used directly by operators, in cases where a PEP 249 cursor isn't needed. """ def __init__( self, service: Any, project_id: str, hook: BigQueryHook, use_legacy_sql: bool = True, api_resource_configs: Optional[Dict] = None, location: Optional[str] = None, num_retries: int = 5, labels: Optional[Dict] = None, ) -> None: super().__init__() self.service = service self.project_id = project_id self.use_legacy_sql = use_legacy_sql if api_resource_configs: _validate_value("api_resource_configs", api_resource_configs, dict) self.api_resource_configs = api_resource_configs if api_resource_configs else {} # type Dict self.running_job_id = None # type: Optional[str] self.location = location self.num_retries = num_retries self.labels = labels self.hook = hook
[docs] def create_empty_table(self, *args, **kwargs) -> None: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.create_empty_table` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.create_empty_table`", DeprecationWarning, stacklevel=3, ) return self.hook.create_empty_table(*args, **kwargs)
[docs] def create_empty_dataset(self, *args, **kwargs) -> None: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.create_empty_dataset` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.create_empty_dataset`", DeprecationWarning, stacklevel=3, ) return self.hook.create_empty_dataset(*args, **kwargs)
[docs] def get_dataset_tables(self, *args, **kwargs) -> List[Dict[str, Any]]: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_dataset_tables` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_dataset_tables`", DeprecationWarning, stacklevel=3, ) return self.hook.get_dataset_tables(*args, **kwargs)
[docs] def delete_dataset(self, *args, **kwargs) -> None: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.delete_dataset` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.delete_dataset`", DeprecationWarning, stacklevel=3, ) return self.hook.delete_dataset(*args, **kwargs)
[docs] def create_external_table(self, *args, **kwargs) -> None: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.create_external_table` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.create_external_table`", DeprecationWarning, stacklevel=3, ) return self.hook.create_external_table(*args, **kwargs)
[docs] def patch_table(self, *args, **kwargs) -> None: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.patch_table` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.patch_table`", DeprecationWarning, stacklevel=3, ) return self.hook.patch_table(*args, **kwargs)
[docs] def insert_all(self, *args, **kwargs) -> None: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.insert_all` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.insert_all`", DeprecationWarning, stacklevel=3, ) return self.hook.insert_all(*args, **kwargs)
[docs] def update_dataset(self, *args, **kwargs) -> Dict: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.update_dataset` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.update_dataset`", DeprecationWarning, stacklevel=3, ) return Dataset.to_api_repr(self.hook.update_dataset(*args, **kwargs))
[docs] def patch_dataset(self, *args, **kwargs) -> Dict: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.patch_dataset` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.patch_dataset`", DeprecationWarning, stacklevel=3, ) return self.hook.patch_dataset(*args, **kwargs)
[docs] def get_dataset_tables_list(self, *args, **kwargs) -> List[Dict[str, Any]]: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_dataset_tables_list` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_dataset_tables_list`", DeprecationWarning, stacklevel=3, ) return self.hook.get_dataset_tables_list(*args, **kwargs)
[docs] def get_datasets_list(self, *args, **kwargs) -> list: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_datasets_list` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_datasets_list`", DeprecationWarning, stacklevel=3, ) return self.hook.get_datasets_list(*args, **kwargs)
[docs] def get_dataset(self, *args, **kwargs) -> dict: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_dataset` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_dataset`", DeprecationWarning, stacklevel=3, ) return self.hook.get_dataset(*args, **kwargs)
[docs] def run_grant_dataset_view_access(self, *args, **kwargs) -> dict: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.run_grant_dataset_view_access` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks" ".bigquery.BigQueryHook.run_grant_dataset_view_access`", DeprecationWarning, stacklevel=3, ) return self.hook.run_grant_dataset_view_access(*args, **kwargs)
[docs] def run_table_upsert(self, *args, **kwargs) -> dict: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.run_table_upsert` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.run_table_upsert`", DeprecationWarning, stacklevel=3, ) return self.hook.run_table_upsert(*args, **kwargs)
[docs] def run_table_delete(self, *args, **kwargs) -> None: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.run_table_delete` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.run_table_delete`", DeprecationWarning, stacklevel=3, ) return self.hook.run_table_delete(*args, **kwargs)
[docs] def get_tabledata(self, *args, **kwargs) -> List[dict]: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_tabledata` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_tabledata`", DeprecationWarning, stacklevel=3, ) return self.hook.get_tabledata(*args, **kwargs)
[docs] def get_schema(self, *args, **kwargs) -> dict: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_schema` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_schema`", DeprecationWarning, stacklevel=3, ) return self.hook.get_schema(*args, **kwargs)
[docs] def poll_job_complete(self, *args, **kwargs) -> bool: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.poll_job_complete` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.poll_job_complete`", DeprecationWarning, stacklevel=3, ) return self.hook.poll_job_complete(*args, **kwargs)
[docs] def cancel_query(self, *args, **kwargs) -> None: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.cancel_query` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.cancel_query`", DeprecationWarning, stacklevel=3, ) return self.hook.cancel_query(*args, **kwargs) # type: ignore
[docs] def run_with_configuration(self, *args, **kwargs) -> str: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.run_with_configuration` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.run_with_configuration`", DeprecationWarning, stacklevel=3, ) return self.hook.run_with_configuration(*args, **kwargs)
[docs] def run_load(self, *args, **kwargs) -> str: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.run_load` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.run_load`", DeprecationWarning, stacklevel=3, ) return self.hook.run_load(*args, **kwargs)
[docs] def run_copy(self, *args, **kwargs) -> str: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.run_copy` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.run_copy`", DeprecationWarning, stacklevel=3, ) return self.hook.run_copy(*args, **kwargs)
[docs] def run_extract(self, *args, **kwargs) -> str: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.run_extract` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.run_extract`", DeprecationWarning, stacklevel=3, ) return self.hook.run_extract(*args, **kwargs)
[docs] def run_query(self, *args, **kwargs) -> str: """ This method is deprecated. Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.run_query` """ warnings.warn( "This method is deprecated. " "Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.run_query`", DeprecationWarning, stacklevel=3, ) return self.hook.run_query(*args, **kwargs)
[docs]class BigQueryCursor(BigQueryBaseCursor): """ A very basic BigQuery PEP 249 cursor implementation. The PyHive PEP 249 implementation was used as a reference: https://github.com/dropbox/PyHive/blob/master/pyhive/presto.py https://github.com/dropbox/PyHive/blob/master/pyhive/common.py """ def __init__( self, service: Any, project_id: str, hook: BigQueryHook, use_legacy_sql: bool = True, location: Optional[str] = None, num_retries: int = 5, ) -> None: super().__init__( service=service, project_id=project_id, hook=hook, use_legacy_sql=use_legacy_sql, location=location, num_retries=num_retries, ) self.buffersize = None # type: Optional[int] self.page_token = None # type: Optional[str] self.job_id = None # type: Optional[str] self.buffer = [] # type: list self.all_pages_loaded = False # type: bool @property
[docs] def description(self) -> None: """The schema description method is not currently implemented""" raise NotImplementedError
[docs] def close(self) -> None:
"""By default, do nothing""" @property
[docs] def rowcount(self) -> int: """By default, return -1 to indicate that this is not supported""" return -1
[docs] def execute(self, operation: str, parameters: Optional[dict] = None) -> None: """ Executes a BigQuery query, and returns the job ID. :param operation: The query to execute. :type operation: str :param parameters: Parameters to substitute into the query. :type parameters: dict """ sql = _bind_parameters(operation, parameters) if parameters else operation self.flush_results() self.job_id = self.hook.run_query(sql)
[docs] def executemany(self, operation: str, seq_of_parameters: list) -> None: """ Execute a BigQuery query multiple times with different parameters. :param operation: The query to execute. :type operation: str :param seq_of_parameters: List of dictionary parameters to substitute into the query. :type seq_of_parameters: list """ for parameters in seq_of_parameters: self.execute(operation, parameters)
[docs] def flush_results(self) -> None: """Flush results related cursor attributes""" self.page_token = None self.job_id = None self.all_pages_loaded = False self.buffer = []
[docs] def fetchone(self) -> Union[List, None]: """Fetch the next row of a query result set""" return self.next()
[docs] def next(self) -> Union[List, None]: """ Helper method for fetchone, which returns the next row from a buffer. If the buffer is empty, attempts to paginate through the result set for the next page, and load it into the buffer. """ if not self.job_id: return None if not self.buffer: if self.all_pages_loaded: return None query_results = ( self.service.jobs() .getQueryResults( projectId=self.project_id, jobId=self.job_id, location=self.location, pageToken=self.page_token, ) .execute(num_retries=self.num_retries) ) if 'rows' in query_results and query_results['rows']: self.page_token = query_results.get('pageToken') fields = query_results['schema']['fields'] col_types = [field['type'] for field in fields] rows = query_results['rows'] for dict_row in rows: typed_row = [_bq_cast(vs['v'], col_types[idx]) for idx, vs in enumerate(dict_row['f'])] self.buffer.append(typed_row) if not self.page_token: self.all_pages_loaded = True else: # Reset all state since we've exhausted the results. self.flush_results() return None return self.buffer.pop(0)
[docs] def fetchmany(self, size: Optional[int] = None) -> list: """ Fetch the next set of rows of a query result, returning a sequence of sequences (e.g. a list of tuples). An empty sequence is returned when no more rows are available. The number of rows to fetch per call is specified by the parameter. If it is not given, the cursor's arraysize determines the number of rows to be fetched. The method should try to fetch as many rows as indicated by the size parameter. If this is not possible due to the specified number of rows not being available, fewer rows may be returned. An :py:class:`~pyhive.exc.Error` (or subclass) exception is raised if the previous call to :py:meth:`execute` did not produce any result set or no call was issued yet. """ if size is None: size = self.arraysize result = [] for _ in range(size): one = self.fetchone() if one is None: break result.append(one) return result
[docs] def fetchall(self) -> List[list]: """ Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). """ result = [] while True: one = self.fetchone() if one is None: break result.append(one) return result
[docs] def get_arraysize(self) -> int: """Specifies the number of rows to fetch at a time with .fetchmany()""" return self.buffersize or 1
[docs] def set_arraysize(self, arraysize: int) -> None: """Specifies the number of rows to fetch at a time with .fetchmany()""" self.buffersize = arraysize
[docs] arraysize = property(get_arraysize, set_arraysize)
[docs] def setinputsizes(self, sizes: Any) -> None:
"""Does nothing by default"""
[docs] def setoutputsize(self, size: Any, column: Any = None) -> None:
"""Does nothing by default""" def _bind_parameters(operation: str, parameters: dict) -> str: """Helper method that binds parameters to a SQL query""" # inspired by MySQL Python Connector (conversion.py) string_parameters = {} # type Dict[str, str] for (name, value) in parameters.items(): if value is None: string_parameters[name] = 'NULL' elif isinstance(value, str): string_parameters[name] = "'" + _escape(value) + "'" else: string_parameters[name] = str(value) return operation % string_parameters def _escape(s: str) -> str: """Helper method that escapes parameters to a SQL query""" e = s e = e.replace('\\', '\\\\') e = e.replace('\n', '\\n') e = e.replace('\r', '\\r') e = e.replace("'", "\\'") e = e.replace('"', '\\"') return e def _bq_cast(string_field: str, bq_type: str) -> Union[None, int, float, bool, str]: """ Helper method that casts a BigQuery row to the appropriate data types. This is useful because BigQuery returns all fields as strings. """ if string_field is None: return None elif bq_type == 'INTEGER': return int(string_field) elif bq_type in ('FLOAT', 'TIMESTAMP'): return float(string_field) elif bq_type == 'BOOLEAN': if string_field not in ['true', 'false']: raise ValueError(f"{string_field} must have value 'true' or 'false'") return string_field == 'true' else: return string_field def _split_tablename( table_input: str, default_project_id: str, var_name: Optional[str] = None ) -> Tuple[str, str, str]: if '.' not in table_input: raise ValueError(f'Expected table name in the format of <dataset>.<table>. Got: {table_input}') if not default_project_id: raise ValueError("INTERNAL: No default project is specified") def var_print(var_name): if var_name is None: return "" else: return f"Format exception for {var_name}: " if table_input.count('.') + table_input.count(':') > 3: raise Exception( '{var}Use either : or . to specify project ' 'got {input}'.format(var=var_print(var_name), input=table_input) ) cmpt = table_input.rsplit(':', 1) project_id = None rest = table_input if len(cmpt) == 1: project_id = None rest = cmpt[0] elif len(cmpt) == 2 and cmpt[0].count(':') <= 1: if cmpt[-1].count('.') != 2: project_id = cmpt[0] rest = cmpt[1] else: raise Exception( '{var}Expect format of (<project:)<dataset>.<table>, ' 'got {input}'.format(var=var_print(var_name), input=table_input) ) cmpt = rest.split('.') if len(cmpt) == 3: if project_id: raise ValueError(f"{var_print(var_name)}Use either : or . to specify project") project_id = cmpt[0] dataset_id = cmpt[1] table_id = cmpt[2] elif len(cmpt) == 2: dataset_id = cmpt[0] table_id = cmpt[1] else: raise Exception( '{var}Expect format of (<project.|<project:)<dataset>.<table>, ' 'got {input}'.format(var=var_print(var_name), input=table_input) ) if project_id is None: if var_name is not None: log.info( 'Project not included in %s: %s; using project "%s"', var_name, table_input, default_project_id, ) project_id = default_project_id return project_id, dataset_id, table_id def _cleanse_time_partitioning( destination_dataset_table: Optional[str], time_partitioning_in: Optional[Dict] ) -> Dict: # if it is a partitioned table ($ is in the table name) add partition load option if time_partitioning_in is None: time_partitioning_in = {} time_partitioning_out = {} if destination_dataset_table and '$' in destination_dataset_table: time_partitioning_out['type'] = 'DAY' time_partitioning_out.update(time_partitioning_in) return time_partitioning_out def _validate_value(key: Any, value: Any, expected_type: Type) -> None: """Function to check expected type and raise error if type is not correct""" if not isinstance(value, expected_type): raise TypeError(f"{key} argument must have a type {expected_type} not {type(value)}") def _api_resource_configs_duplication_check( key: Any, value: Any, config_dict: dict, config_dict_name='api_resource_configs' ) -> None: if key in config_dict and value != config_dict[key]: raise ValueError( "Values of {param_name} param are duplicated. " "{dict_name} contained {param_name} param " "in `query` config and {param_name} was also provided " "with arg to run_query() method. Please remove duplicates.".format( param_name=key, dict_name=config_dict_name ) ) def _validate_src_fmt_configs( source_format: str, src_fmt_configs: dict, valid_configs: List[str], backward_compatibility_configs: Optional[Dict] = None, ) -> Dict: """ Validates the given src_fmt_configs against a valid configuration for the source format. Adds the backward compatibility config to the src_fmt_configs. :param source_format: File format to export. :type source_format: str :param src_fmt_configs: Configure optional fields specific to the source format. :type src_fmt_configs: dict :param valid_configs: Valid configuration specific to the source format :type valid_configs: List[str] :param backward_compatibility_configs: The top-level params for backward-compatibility :type backward_compatibility_configs: dict """ if backward_compatibility_configs is None: backward_compatibility_configs = {} for k, v in backward_compatibility_configs.items(): if k not in src_fmt_configs and k in valid_configs: src_fmt_configs[k] = v for k, v in src_fmt_configs.items(): if k not in valid_configs: raise ValueError(f"{k} is not a valid src_fmt_configs for type {source_format}.") return src_fmt_configs

Was this entry helpful?