# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
This module contains a BigQuery Hook, as well as a very basic PEP 249
implementation for BigQuery.
"""
import time
import six
from builtins import range
from copy import deepcopy
from six import iteritems
from past.builtins import basestring
from airflow import AirflowException
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
from airflow.hooks.dbapi_hook import DbApiHook
from airflow.utils.log.logging_mixin import LoggingMixin
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from pandas_gbq.gbq import \
_check_google_client_version as gbq_check_google_client_version
from pandas_gbq import read_gbq
from pandas_gbq.gbq import \
_test_google_api_imports as gbq_test_google_api_imports
from pandas_gbq.gbq import GbqConnector
[docs]class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin):
"""
Interact with BigQuery. This hook uses the Google Cloud Platform
connection.
"""
conn_name_attr = 'bigquery_conn_id'
def __init__(self,
bigquery_conn_id='bigquery_default',
delegate_to=None,
use_legacy_sql=True,
location=None):
super(BigQueryHook, self).__init__(
gcp_conn_id=bigquery_conn_id, delegate_to=delegate_to)
self.use_legacy_sql = use_legacy_sql
self.location = location
[docs] def get_conn(self):
"""
Returns a BigQuery PEP 249 connection object.
"""
service = self.get_service()
project = self._get_field('project')
return BigQueryConnection(
service=service,
project_id=project,
use_legacy_sql=self.use_legacy_sql,
location=self.location,
)
[docs] def get_service(self):
"""
Returns a BigQuery service object.
"""
http_authorized = self._authorize()
return build(
'bigquery', 'v2', http=http_authorized, cache_discovery=False)
[docs] def insert_rows(self, table, rows, target_fields=None, commit_every=1000):
"""
Insertion is currently unsupported. Theoretically, you could use
BigQuery's streaming API to insert rows into a table, but this hasn't
been implemented.
"""
raise NotImplementedError()
[docs] def get_pandas_df(self, sql, parameters=None, dialect=None):
"""
Returns a Pandas DataFrame for the results produced by a BigQuery
query. The DbApiHook method must be overridden because Pandas
doesn't support PEP 249 connections, except for SQLite. See:
https://github.com/pydata/pandas/blob/master/pandas/io/sql.py#L447
https://github.com/pydata/pandas/issues/6900
:param sql: The BigQuery SQL to execute.
:type sql: string
:param parameters: The parameters to render the SQL query with (not
used, leave to override superclass method)
:type parameters: mapping or iterable
:param dialect: Dialect of BigQuery SQL – legacy SQL or standard SQL
defaults to use `self.use_legacy_sql` if not specified
:type dialect: string in {'legacy', 'standard'}
"""
if dialect is None:
dialect = 'legacy' if self.use_legacy_sql else 'standard'
return read_gbq(sql,
project_id=self._get_field('project'),
dialect=dialect,
verbose=False)
[docs] def table_exists(self, project_id, dataset_id, table_id):
"""
Checks for the existence of a table in Google BigQuery.
:param project_id: The Google cloud project in which to look for the
table. The connection supplied to the hook must provide access to
the specified project.
:type project_id: string
:param dataset_id: The name of the dataset in which to look for the
table.
:type dataset_id: string
:param table_id: The name of the table to check the existence of.
:type table_id: string
"""
service = self.get_service()
try:
service.tables().get(
projectId=project_id, datasetId=dataset_id,
tableId=table_id).execute()
return True
except HttpError as e:
if e.resp['status'] == '404':
return False
raise
class BigQueryPandasConnector(GbqConnector):
"""
This connector behaves identically to GbqConnector (from Pandas), except
that it allows the service to be injected, and disables a call to
self.get_credentials(). This allows Airflow to use BigQuery with Pandas
without forcing a three legged OAuth connection. Instead, we can inject
service account credentials into the binding.
"""
def __init__(self,
project_id,
service,
reauth=False,
verbose=False,
dialect='legacy'):
super(BigQueryPandasConnector, self).__init__(project_id)
gbq_check_google_client_version()
gbq_test_google_api_imports()
self.project_id = project_id
self.reauth = reauth
self.service = service
self.verbose = verbose
self.dialect = dialect
class BigQueryConnection(object):
"""
BigQuery does not have a notion of a persistent connection. Thus, these
objects are small stateless factories for cursors, which do all the real
work.
"""
def __init__(self, *args, **kwargs):
self._args = args
self._kwargs = kwargs
def close(self):
""" BigQueryConnection does not have anything to close. """
pass
def commit(self):
""" BigQueryConnection does not support transactions. """
pass
def cursor(self):
""" Return a new :py:class:`Cursor` object using the connection. """
return BigQueryCursor(*self._args, **self._kwargs)
def rollback(self):
raise NotImplementedError(
"BigQueryConnection does not have transactions")
class BigQueryBaseCursor(LoggingMixin):
"""
The BigQuery base cursor contains helper methods to execute queries against
BigQuery. The methods can be used directly by operators, in cases where a
PEP 249 cursor isn't needed.
"""
def __init__(self,
service,
project_id,
use_legacy_sql=True,
api_resource_configs=None,
location=None):
self.service = service
self.project_id = project_id
self.use_legacy_sql = use_legacy_sql
if api_resource_configs:
_validate_value("api_resource_configs", api_resource_configs, dict)
self.api_resource_configs = api_resource_configs \
if api_resource_configs else {}
self.running_job_id = None
self.location = location
def create_empty_table(self,
project_id,
dataset_id,
table_id,
schema_fields=None,
time_partitioning=None,
labels=None,
view=None):
"""
Creates a new, empty table in the dataset.
To create a view, which is defined by a SQL query, parse a dictionary to 'view' kwarg
:param project_id: The project to create the table into.
:type project_id: str
:param dataset_id: The dataset to create the table into.
:type dataset_id: str
:param table_id: The Name of the table to be created.
:type table_id: str
:param schema_fields: If set, the schema field list as defined here:
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
:type schema_fields: list
:param labels: a dictionary containing labels for the table, passed to BigQuery
:type labels: dict
**Example**: ::
schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}]
:param time_partitioning: configure optional time partitioning fields i.e.
partition by field, type and expiration as per API specifications.
.. seealso::
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#timePartitioning
:type time_partitioning: dict
:param view: [Optional] A dictionary containing definition for the view.
If set, it will create a view instead of a table:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#view
:type view: dict
**Example**: ::
view = {
"query": "SELECT * FROM `test-project-id.test_dataset_id.test_table_prefix*` LIMIT 1000",
"useLegacySql": False
}
:return:
"""
project_id = project_id if project_id is not None else self.project_id
table_resource = {
'tableReference': {
'tableId': table_id
}
}
if schema_fields:
table_resource['schema'] = {'fields': schema_fields}
if time_partitioning:
table_resource['timePartitioning'] = time_partitioning
if labels:
table_resource['labels'] = labels
if view:
table_resource['view'] = view
self.log.info('Creating Table %s:%s.%s',
project_id, dataset_id, table_id)
try:
self.service.tables().insert(
projectId=project_id,
datasetId=dataset_id,
body=table_resource).execute()
self.log.info('Table created successfully: %s:%s.%s',
project_id, dataset_id, table_id)
except HttpError as err:
raise AirflowException(
'BigQuery job failed. Error was: {}'.format(err.content)
)
def create_external_table(self,
external_project_dataset_table,
schema_fields,
source_uris,
source_format='CSV',
autodetect=False,
compression='NONE',
ignore_unknown_values=False,
max_bad_records=0,
skip_leading_rows=0,
field_delimiter=',',
quote_character=None,
allow_quoted_newlines=False,
allow_jagged_rows=False,
src_fmt_configs=None,
labels=None
):
"""
Creates a new external table in the dataset with the data in Google
Cloud Storage. See here:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
for more details about these parameters.
:param external_project_dataset_table:
The dotted (<project>.|<project>:)<dataset>.<table>($<partition>) BigQuery
table name to create external table.
If <project> is not included, project will be the
project defined in the connection json.
:type external_project_dataset_table: string
:param schema_fields: The schema field list as defined here:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
:type schema_fields: list
:param source_uris: The source Google Cloud
Storage URI (e.g. gs://some-bucket/some-file.txt). A single wild
per-object name can be used.
:type source_uris: list
:param source_format: File format to export.
:type source_format: string
:param autodetect: Try to detect schema and format options automatically.
Any option specified explicitly will be honored.
:type autodetect: bool
:param compression: [Optional] The compression type of the data source.
Possible values include GZIP and NONE.
The default value is NONE.
This setting is ignored for Google Cloud Bigtable,
Google Cloud Datastore backups and Avro formats.
:type compression: string
:param ignore_unknown_values: [Optional] Indicates if BigQuery should allow
extra values that are not represented in the table schema.
If true, the extra values are ignored. If false, records with extra columns
are treated as bad records, and if there are too many bad records, an
invalid error is returned in the job result.
:type ignore_unknown_values: bool
:param max_bad_records: The maximum number of bad records that BigQuery can
ignore when running the job.
:type max_bad_records: int
:param skip_leading_rows: Number of rows to skip when loading from a CSV.
:type skip_leading_rows: int
:param field_delimiter: The delimiter to use when loading from a CSV.
:type field_delimiter: string
:param quote_character: The value that is used to quote data sections in a CSV
file.
:type quote_character: string
:param allow_quoted_newlines: Whether to allow quoted newlines (true) or not
(false).
:type allow_quoted_newlines: boolean
:param allow_jagged_rows: Accept rows that are missing trailing optional columns.
The missing values are treated as nulls. If false, records with missing
trailing columns are treated as bad records, and if there are too many bad
records, an invalid error is returned in the job result. Only applicable when
soure_format is CSV.
:type allow_jagged_rows: bool
:param src_fmt_configs: configure optional fields specific to the source format
:type src_fmt_configs: dict
:param labels: a dictionary containing labels for the table, passed to BigQuery
:type labels: dict
"""
if src_fmt_configs is None:
src_fmt_configs = {}
project_id, dataset_id, external_table_id = \
_split_tablename(table_input=external_project_dataset_table,
default_project_id=self.project_id,
var_name='external_project_dataset_table')
# bigquery only allows certain source formats
# we check to make sure the passed source format is valid
# if it's not, we raise a ValueError
# Refer to this link for more details:
# https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.sourceFormat
source_format = source_format.upper()
allowed_formats = [
"CSV", "NEWLINE_DELIMITED_JSON", "AVRO", "GOOGLE_SHEETS",
"DATASTORE_BACKUP", "PARQUET"
]
if source_format not in allowed_formats:
raise ValueError("{0} is not a valid source format. "
"Please use one of the following types: {1}"
.format(source_format, allowed_formats))
compression = compression.upper()
allowed_compressions = ['NONE', 'GZIP']
if compression not in allowed_compressions:
raise ValueError("{0} is not a valid compression format. "
"Please use one of the following types: {1}"
.format(compression, allowed_compressions))
table_resource = {
'externalDataConfiguration': {
'autodetect': autodetect,
'sourceFormat': source_format,
'sourceUris': source_uris,
'compression': compression,
'ignoreUnknownValues': ignore_unknown_values
},
'tableReference': {
'projectId': project_id,
'datasetId': dataset_id,
'tableId': external_table_id,
}
}
if schema_fields:
table_resource['externalDataConfiguration'].update({
'schema': {
'fields': schema_fields
}
})
self.log.info('Creating external table: %s', external_project_dataset_table)
if max_bad_records:
table_resource['externalDataConfiguration']['maxBadRecords'] = max_bad_records
# if following fields are not specified in src_fmt_configs,
# honor the top-level params for backward-compatibility
if 'skipLeadingRows' not in src_fmt_configs:
src_fmt_configs['skipLeadingRows'] = skip_leading_rows
if 'fieldDelimiter' not in src_fmt_configs:
src_fmt_configs['fieldDelimiter'] = field_delimiter
if 'quote_character' not in src_fmt_configs:
src_fmt_configs['quote'] = quote_character
if 'allowQuotedNewlines' not in src_fmt_configs:
src_fmt_configs['allowQuotedNewlines'] = allow_quoted_newlines
if 'allowJaggedRows' not in src_fmt_configs:
src_fmt_configs['allowJaggedRows'] = allow_jagged_rows
src_fmt_to_param_mapping = {
'CSV': 'csvOptions',
'GOOGLE_SHEETS': 'googleSheetsOptions'
}
src_fmt_to_configs_mapping = {
'csvOptions': [
'allowJaggedRows', 'allowQuotedNewlines',
'fieldDelimiter', 'skipLeadingRows',
'quote'
],
'googleSheetsOptions': ['skipLeadingRows']
}
if source_format in src_fmt_to_param_mapping.keys():
valid_configs = src_fmt_to_configs_mapping[
src_fmt_to_param_mapping[source_format]
]
src_fmt_configs = {
k: v
for k, v in src_fmt_configs.items() if k in valid_configs
}
table_resource['externalDataConfiguration'][src_fmt_to_param_mapping[
source_format]] = src_fmt_configs
if labels:
table_resource['labels'] = labels
try:
self.service.tables().insert(
projectId=project_id,
datasetId=dataset_id,
body=table_resource
).execute()
self.log.info('External table created successfully: %s',
external_project_dataset_table)
except HttpError as err:
raise Exception(
'BigQuery job failed. Error was: {}'.format(err.content)
)
def run_query(self,
bql=None,
sql=None,
destination_dataset_table=None,
write_disposition='WRITE_EMPTY',
allow_large_results=False,
flatten_results=None,
udf_config=None,
use_legacy_sql=None,
maximum_billing_tier=None,
maximum_bytes_billed=None,
create_disposition='CREATE_IF_NEEDED',
query_params=None,
labels=None,
schema_update_options=(),
priority='INTERACTIVE',
time_partitioning=None,
api_resource_configs=None,
cluster_fields=None,
location=None):
"""
Executes a BigQuery SQL query. Optionally persists results in a BigQuery
table. See here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs
For more details about these parameters.
:param bql: (Deprecated. Use `sql` parameter instead) The BigQuery SQL
to execute.
:type bql: string
:param sql: The BigQuery SQL to execute.
:type sql: string
:param destination_dataset_table: The dotted <dataset>.<table>
BigQuery table to save the query results.
:type destination_dataset_table: string
:param write_disposition: What to do if the table already exists in
BigQuery.
:type write_disposition: string
:param allow_large_results: Whether to allow large results.
:type allow_large_results: boolean
:param flatten_results: If true and query uses legacy SQL dialect, flattens
all nested and repeated fields in the query results. ``allowLargeResults``
must be true if this is set to false. For standard SQL queries, this
flag is ignored and results are never flattened.
:type flatten_results: boolean
:param udf_config: The User Defined Function configuration for the query.
See https://cloud.google.com/bigquery/user-defined-functions for details.
:type udf_config: list
:param use_legacy_sql: Whether to use legacy SQL (true) or standard SQL (false).
If `None`, defaults to `self.use_legacy_sql`.
:type use_legacy_sql: boolean
:param api_resource_configs: a dictionary that contain params
'configuration' applied for Google BigQuery Jobs API:
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs
for example, {'query': {'useQueryCache': False}}. You could use it
if you need to provide some params that are not supported by the
BigQueryHook like args.
:type api_resource_configs: dict
:param maximum_billing_tier: Positive integer that serves as a
multiplier of the basic price.
:type maximum_billing_tier: integer
:param maximum_bytes_billed: Limits the bytes billed for this job.
Queries that will have bytes billed beyond this limit will fail
(without incurring a charge). If unspecified, this will be
set to your project default.
:type maximum_bytes_billed: float
:param create_disposition: Specifies whether the job is allowed to
create new tables.
:type create_disposition: string
:param query_params a dictionary containing query parameter types and
values, passed to BigQuery
:type query_params: dict
:param labels a dictionary containing labels for the job/query,
passed to BigQuery
:type labels: dict
:param schema_update_options: Allows the schema of the destination
table to be updated as a side effect of the query job.
:type schema_update_options: tuple
:param priority: Specifies a priority for the query.
Possible values include INTERACTIVE and BATCH.
The default value is INTERACTIVE.
:type priority: string
:param time_partitioning: configure optional time partitioning fields i.e.
partition by field, type and expiration as per API specifications.
:type time_partitioning: dict
:param cluster_fields: Request that the result of this query be stored sorted
by one or more columns. This is only available in combination with
time_partitioning. The order of columns given determines the sort order.
:type cluster_fields: list of str
:param location: The geographic location of the job. Required except for
US and EU. See details at
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
:type location: str
"""
if time_partitioning is None:
time_partitioning = {}
if location:
self.location = location
if not api_resource_configs:
api_resource_configs = self.api_resource_configs
else:
_validate_value('api_resource_configs',
api_resource_configs, dict)
configuration = deepcopy(api_resource_configs)
if 'query' not in configuration:
configuration['query'] = {}
else:
_validate_value("api_resource_configs['query']",
configuration['query'], dict)
sql = bql if sql is None else sql
# TODO remove `bql` in Airflow 2.0 - Jira: [AIRFLOW-2513]
if bql:
import warnings
warnings.warn('Deprecated parameter `bql` used in '
'`BigQueryBaseCursor.run_query` '
'Use `sql` parameter instead to pass the sql to be '
'executed. `bql` parameter is deprecated and '
'will be removed in a future version of '
'Airflow.',
category=DeprecationWarning)
if sql is None and not configuration['query'].get('query', None):
raise TypeError('`BigQueryBaseCursor.run_query` '
'missing 1 required positional argument: `sql`')
# BigQuery also allows you to define how you want a table's schema to change
# as a side effect of a query job
# for more details:
# https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.schemaUpdateOptions
allowed_schema_update_options = [
'ALLOW_FIELD_ADDITION', "ALLOW_FIELD_RELAXATION"
]
if not set(allowed_schema_update_options
).issuperset(set(schema_update_options)):
raise ValueError("{0} contains invalid schema update options. "
"Please only use one or more of the following "
"options: {1}"
.format(schema_update_options,
allowed_schema_update_options))
if schema_update_options:
if write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]:
raise ValueError("schema_update_options is only "
"allowed if write_disposition is "
"'WRITE_APPEND' or 'WRITE_TRUNCATE'.")
if destination_dataset_table:
destination_project, destination_dataset, destination_table = \
_split_tablename(table_input=destination_dataset_table,
default_project_id=self.project_id)
destination_dataset_table = {
'projectId': destination_project,
'datasetId': destination_dataset,
'tableId': destination_table,
}
if cluster_fields:
cluster_fields = {'fields': cluster_fields}
query_param_list = [
(sql, 'query', None, six.string_types),
(priority, 'priority', 'INTERACTIVE', six.string_types),
(use_legacy_sql, 'useLegacySql', self.use_legacy_sql, bool),
(query_params, 'queryParameters', None, dict),
(udf_config, 'userDefinedFunctionResources', None, list),
(maximum_billing_tier, 'maximumBillingTier', None, int),
(maximum_bytes_billed, 'maximumBytesBilled', None, float),
(time_partitioning, 'timePartitioning', {}, dict),
(schema_update_options, 'schemaUpdateOptions', None, tuple),
(destination_dataset_table, 'destinationTable', None, dict),
(cluster_fields, 'clustering', None, dict),
]
for param_tuple in query_param_list:
param, param_name, param_default, param_type = param_tuple
if param_name not in configuration['query'] and param in [None, {}, ()]:
if param_name == 'timePartitioning':
param_default = _cleanse_time_partitioning(
destination_dataset_table, time_partitioning)
param = param_default
if param not in [None, {}, ()]:
_api_resource_configs_duplication_check(
param_name, param, configuration['query'])
configuration['query'][param_name] = param
# check valid type of provided param,
# it last step because we can get param from 2 sources,
# and first of all need to find it
_validate_value(param_name, configuration['query'][param_name],
param_type)
if param_name == 'schemaUpdateOptions' and param:
self.log.info("Adding experimental 'schemaUpdateOptions': "
"{0}".format(schema_update_options))
if param_name == 'destinationTable':
for key in ['projectId', 'datasetId', 'tableId']:
if key not in configuration['query']['destinationTable']:
raise ValueError(
"Not correct 'destinationTable' in "
"api_resource_configs. 'destinationTable' "
"must be a dict with {'projectId':'', "
"'datasetId':'', 'tableId':''}")
configuration['query'].update({
'allowLargeResults': allow_large_results,
'flattenResults': flatten_results,
'writeDisposition': write_disposition,
'createDisposition': create_disposition,
})
if 'useLegacySql' in configuration['query'] and \
'queryParameters' in configuration['query']:
raise ValueError("Query parameters are not allowed "
"when using legacy SQL")
if labels:
_api_resource_configs_duplication_check(
'labels', labels, configuration)
configuration['labels'] = labels
return self.run_with_configuration(configuration)
def run_extract( # noqa
self,
source_project_dataset_table,
destination_cloud_storage_uris,
compression='NONE',
export_format='CSV',
field_delimiter=',',
print_header=True,
labels=None):
"""
Executes a BigQuery extract command to copy data from BigQuery to
Google Cloud Storage. See here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs
For more details about these parameters.
:param source_project_dataset_table: The dotted <dataset>.<table>
BigQuery table to use as the source data.
:type source_project_dataset_table: string
:param destination_cloud_storage_uris: The destination Google Cloud
Storage URI (e.g. gs://some-bucket/some-file.txt). Follows
convention defined here:
https://cloud.google.com/bigquery/exporting-data-from-bigquery#exportingmultiple
:type destination_cloud_storage_uris: list
:param compression: Type of compression to use.
:type compression: string
:param export_format: File format to export.
:type export_format: string
:param field_delimiter: The delimiter to use when extracting to a CSV.
:type field_delimiter: string
:param print_header: Whether to print a header for a CSV file extract.
:type print_header: boolean
:param labels: a dictionary containing labels for the job/query,
passed to BigQuery
:type labels: dict
"""
source_project, source_dataset, source_table = \
_split_tablename(table_input=source_project_dataset_table,
default_project_id=self.project_id,
var_name='source_project_dataset_table')
configuration = {
'extract': {
'sourceTable': {
'projectId': source_project,
'datasetId': source_dataset,
'tableId': source_table,
},
'compression': compression,
'destinationUris': destination_cloud_storage_uris,
'destinationFormat': export_format,
}
}
if labels:
configuration['labels'] = labels
if export_format == 'CSV':
# Only set fieldDelimiter and printHeader fields if using CSV.
# Google does not like it if you set these fields for other export
# formats.
configuration['extract']['fieldDelimiter'] = field_delimiter
configuration['extract']['printHeader'] = print_header
return self.run_with_configuration(configuration)
def run_copy(self,
source_project_dataset_tables,
destination_project_dataset_table,
write_disposition='WRITE_EMPTY',
create_disposition='CREATE_IF_NEEDED',
labels=None):
"""
Executes a BigQuery copy command to copy data from one BigQuery table
to another. See here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy
For more details about these parameters.
:param source_project_dataset_tables: One or more dotted
(project:|project.)<dataset>.<table>
BigQuery tables to use as the source data. Use a list if there are
multiple source tables.
If <project> is not included, project will be the project defined
in the connection json.
:type source_project_dataset_tables: list|string
:param destination_project_dataset_table: The destination BigQuery
table. Format is: (project:|project.)<dataset>.<table>
:type destination_project_dataset_table: string
:param write_disposition: The write disposition if the table already exists.
:type write_disposition: string
:param create_disposition: The create disposition if the table doesn't exist.
:type create_disposition: string
:param labels a dictionary containing labels for the job/query,
passed to BigQuery
:type labels: dict
"""
source_project_dataset_tables = ([
source_project_dataset_tables
] if not isinstance(source_project_dataset_tables, list) else
source_project_dataset_tables)
source_project_dataset_tables_fixup = []
for source_project_dataset_table in source_project_dataset_tables:
source_project, source_dataset, source_table = \
_split_tablename(table_input=source_project_dataset_table,
default_project_id=self.project_id,
var_name='source_project_dataset_table')
source_project_dataset_tables_fixup.append({
'projectId':
source_project,
'datasetId':
source_dataset,
'tableId':
source_table
})
destination_project, destination_dataset, destination_table = \
_split_tablename(table_input=destination_project_dataset_table,
default_project_id=self.project_id)
configuration = {
'copy': {
'createDisposition': create_disposition,
'writeDisposition': write_disposition,
'sourceTables': source_project_dataset_tables_fixup,
'destinationTable': {
'projectId': destination_project,
'datasetId': destination_dataset,
'tableId': destination_table
}
}
}
if labels:
configuration['labels'] = labels
return self.run_with_configuration(configuration)
def run_load(self,
destination_project_dataset_table,
source_uris,
schema_fields=None,
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
skip_leading_rows=0,
write_disposition='WRITE_EMPTY',
field_delimiter=',',
max_bad_records=0,
quote_character=None,
ignore_unknown_values=False,
allow_quoted_newlines=False,
allow_jagged_rows=False,
schema_update_options=(),
src_fmt_configs=None,
time_partitioning=None,
cluster_fields=None,
autodetect=False):
"""
Executes a BigQuery load command to load data from Google Cloud Storage
to BigQuery. See here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs
For more details about these parameters.
:param destination_project_dataset_table:
The dotted (<project>.|<project>:)<dataset>.<table>($<partition>) BigQuery
table to load data into. If <project> is not included, project will be the
project defined in the connection json. If a partition is specified the
operator will automatically append the data, create a new partition or create
a new DAY partitioned table.
:type destination_project_dataset_table: string
:param schema_fields: The schema field list as defined here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load
Required if autodetect=False; optional if autodetect=True.
:type schema_fields: list
:param autodetect: Attempt to autodetect the schema for CSV and JSON
source files.
:type autodetect: bool
:param source_uris: The source Google Cloud
Storage URI (e.g. gs://some-bucket/some-file.txt). A single wild
per-object name can be used.
:type source_uris: list
:param source_format: File format to export.
:type source_format: string
:param create_disposition: The create disposition if the table doesn't exist.
:type create_disposition: string
:param skip_leading_rows: Number of rows to skip when loading from a CSV.
:type skip_leading_rows: int
:param write_disposition: The write disposition if the table already exists.
:type write_disposition: string
:param field_delimiter: The delimiter to use when loading from a CSV.
:type field_delimiter: string
:param max_bad_records: The maximum number of bad records that BigQuery can
ignore when running the job.
:type max_bad_records: int
:param quote_character: The value that is used to quote data sections in a CSV
file.
:type quote_character: string
:param ignore_unknown_values: [Optional] Indicates if BigQuery should allow
extra values that are not represented in the table schema.
If true, the extra values are ignored. If false, records with extra columns
are treated as bad records, and if there are too many bad records, an
invalid error is returned in the job result.
:type ignore_unknown_values: bool
:param allow_quoted_newlines: Whether to allow quoted newlines (true) or not
(false).
:type allow_quoted_newlines: boolean
:param allow_jagged_rows: Accept rows that are missing trailing optional columns.
The missing values are treated as nulls. If false, records with missing
trailing columns are treated as bad records, and if there are too many bad
records, an invalid error is returned in the job result. Only applicable when
soure_format is CSV.
:type allow_jagged_rows: bool
:param schema_update_options: Allows the schema of the desitination
table to be updated as a side effect of the load job.
:type schema_update_options: tuple
:param src_fmt_configs: configure optional fields specific to the source format
:type src_fmt_configs: dict
:param time_partitioning: configure optional time partitioning fields i.e.
partition by field, type and expiration as per API specifications.
:type time_partitioning: dict
:param cluster_fields: Request that the result of this load be stored sorted
by one or more columns. This is only available in combination with
time_partitioning. The order of columns given determines the sort order.
:type cluster_fields: list of str
"""
# bigquery only allows certain source formats
# we check to make sure the passed source format is valid
# if it's not, we raise a ValueError
# Refer to this link for more details:
# https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceFormat
if schema_fields is None and not autodetect:
raise ValueError(
'You must either pass a schema or autodetect=True.')
if src_fmt_configs is None:
src_fmt_configs = {}
source_format = source_format.upper()
allowed_formats = [
"CSV", "NEWLINE_DELIMITED_JSON", "AVRO", "GOOGLE_SHEETS",
"DATASTORE_BACKUP", "PARQUET"
]
if source_format not in allowed_formats:
raise ValueError("{0} is not a valid source format. "
"Please use one of the following types: {1}"
.format(source_format, allowed_formats))
# bigquery also allows you to define how you want a table's schema to change
# as a side effect of a load
# for more details:
# https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schemaUpdateOptions
allowed_schema_update_options = [
'ALLOW_FIELD_ADDITION', "ALLOW_FIELD_RELAXATION"
]
if not set(allowed_schema_update_options).issuperset(
set(schema_update_options)):
raise ValueError(
"{0} contains invalid schema update options."
"Please only use one or more of the following options: {1}"
.format(schema_update_options, allowed_schema_update_options))
destination_project, destination_dataset, destination_table = \
_split_tablename(table_input=destination_project_dataset_table,
default_project_id=self.project_id,
var_name='destination_project_dataset_table')
configuration = {
'load': {
'autodetect': autodetect,
'createDisposition': create_disposition,
'destinationTable': {
'projectId': destination_project,
'datasetId': destination_dataset,
'tableId': destination_table,
},
'sourceFormat': source_format,
'sourceUris': source_uris,
'writeDisposition': write_disposition,
'ignoreUnknownValues': ignore_unknown_values
}
}
time_partitioning = _cleanse_time_partitioning(
destination_project_dataset_table,
time_partitioning
)
if time_partitioning:
configuration['load'].update({
'timePartitioning': time_partitioning
})
if cluster_fields:
configuration['load'].update({'clustering': {'fields': cluster_fields}})
if schema_fields:
configuration['load']['schema'] = {'fields': schema_fields}
if schema_update_options:
if write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]:
raise ValueError("schema_update_options is only "
"allowed if write_disposition is "
"'WRITE_APPEND' or 'WRITE_TRUNCATE'.")
else:
self.log.info(
"Adding experimental "
"'schemaUpdateOptions': {0}".format(schema_update_options))
configuration['load'][
'schemaUpdateOptions'] = schema_update_options
if max_bad_records:
configuration['load']['maxBadRecords'] = max_bad_records
# if following fields are not specified in src_fmt_configs,
# honor the top-level params for backward-compatibility
if 'skipLeadingRows' not in src_fmt_configs:
src_fmt_configs['skipLeadingRows'] = skip_leading_rows
if 'fieldDelimiter' not in src_fmt_configs:
src_fmt_configs['fieldDelimiter'] = field_delimiter
if 'ignoreUnknownValues' not in src_fmt_configs:
src_fmt_configs['ignoreUnknownValues'] = ignore_unknown_values
if quote_character is not None:
src_fmt_configs['quote'] = quote_character
if allow_quoted_newlines:
src_fmt_configs['allowQuotedNewlines'] = allow_quoted_newlines
src_fmt_to_configs_mapping = {
'CSV': [
'allowJaggedRows', 'allowQuotedNewlines', 'autodetect',
'fieldDelimiter', 'skipLeadingRows', 'ignoreUnknownValues',
'nullMarker', 'quote'
],
'DATASTORE_BACKUP': ['projectionFields'],
'NEWLINE_DELIMITED_JSON': ['autodetect', 'ignoreUnknownValues'],
'PARQUET': ['autodetect', 'ignoreUnknownValues'],
'AVRO': [],
}
valid_configs = src_fmt_to_configs_mapping[source_format]
src_fmt_configs = {
k: v
for k, v in src_fmt_configs.items() if k in valid_configs
}
configuration['load'].update(src_fmt_configs)
if allow_jagged_rows:
configuration['load']['allowJaggedRows'] = allow_jagged_rows
return self.run_with_configuration(configuration)
def run_with_configuration(self, configuration):
"""
Executes a BigQuery SQL query. See here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs
For more details about the configuration parameter.
:param configuration: The configuration parameter maps directly to
BigQuery's configuration field in the job object. See
https://cloud.google.com/bigquery/docs/reference/v2/jobs for
details.
"""
jobs = self.service.jobs()
job_data = {'configuration': configuration}
# Send query and wait for reply.
query_reply = jobs \
.insert(projectId=self.project_id, body=job_data) \
.execute()
self.running_job_id = query_reply['jobReference']['jobId']
# Wait for query to finish.
keep_polling_job = True
while keep_polling_job:
try:
if self.location:
job = jobs.get(
projectId=self.project_id,
jobId=self.running_job_id,
location=self.location).execute()
else:
job = jobs.get(
projectId=self.project_id,
jobId=self.running_job_id).execute()
if job['status']['state'] == 'DONE':
keep_polling_job = False
# Check if job had errors.
if 'errorResult' in job['status']:
raise Exception(
'BigQuery job failed. Final error was: {}. The job was: {}'.
format(job['status']['errorResult'], job))
else:
self.log.info('Waiting for job to complete : %s, %s',
self.project_id, self.running_job_id)
time.sleep(5)
except HttpError as err:
if err.resp.status in [500, 503]:
self.log.info(
'%s: Retryable error, waiting for job to complete: %s',
err.resp.status, self.running_job_id)
time.sleep(5)
else:
raise Exception(
'BigQuery job status check failed. Final error was: %s',
err.resp.status)
return self.running_job_id
def poll_job_complete(self, job_id):
jobs = self.service.jobs()
try:
if self.location:
job = jobs.get(projectId=self.project_id,
jobId=job_id,
location=self.location).execute()
else:
job = jobs.get(projectId=self.project_id,
jobId=job_id).execute()
if job['status']['state'] == 'DONE':
return True
except HttpError as err:
if err.resp.status in [500, 503]:
self.log.info(
'%s: Retryable error while polling job with id %s',
err.resp.status, job_id)
else:
raise Exception(
'BigQuery job status check failed. Final error was: %s',
err.resp.status)
return False
def cancel_query(self):
"""
Cancel all started queries that have not yet completed
"""
jobs = self.service.jobs()
if (self.running_job_id and
not self.poll_job_complete(self.running_job_id)):
self.log.info('Attempting to cancel job : %s, %s', self.project_id,
self.running_job_id)
if self.location:
jobs.cancel(
projectId=self.project_id,
jobId=self.running_job_id,
location=self.location).execute()
else:
jobs.cancel(
projectId=self.project_id,
jobId=self.running_job_id).execute()
else:
self.log.info('No running BigQuery jobs to cancel.')
return
# Wait for all the calls to cancel to finish
max_polling_attempts = 12
polling_attempts = 0
job_complete = False
while polling_attempts < max_polling_attempts and not job_complete:
polling_attempts = polling_attempts + 1
job_complete = self.poll_job_complete(self.running_job_id)
if job_complete:
self.log.info('Job successfully canceled: %s, %s',
self.project_id, self.running_job_id)
elif polling_attempts == max_polling_attempts:
self.log.info(
"Stopping polling due to timeout. Job with id %s "
"has not completed cancel and may or may not finish.",
self.running_job_id)
else:
self.log.info('Waiting for canceled job with id %s to finish.',
self.running_job_id)
time.sleep(5)
def get_schema(self, dataset_id, table_id):
"""
Get the schema for a given datset.table.
see https://cloud.google.com/bigquery/docs/reference/v2/tables#resource
:param dataset_id: the dataset ID of the requested table
:param table_id: the table ID of the requested table
:return: a table schema
"""
tables_resource = self.service.tables() \
.get(projectId=self.project_id, datasetId=dataset_id, tableId=table_id) \
.execute()
return tables_resource['schema']
def get_tabledata(self, dataset_id, table_id,
max_results=None, selected_fields=None, page_token=None,
start_index=None):
"""
Get the data of a given dataset.table and optionally with selected columns.
see https://cloud.google.com/bigquery/docs/reference/v2/tabledata/list
:param dataset_id: the dataset ID of the requested table.
:param table_id: the table ID of the requested table.
:param max_results: the maximum results to return.
:param selected_fields: List of fields to return (comma-separated). If
unspecified, all fields are returned.
:param page_token: page token, returned from a previous call,
identifying the result set.
:param start_index: zero based index of the starting row to read.
:return: map containing the requested rows.
"""
optional_params = {}
if max_results:
optional_params['maxResults'] = max_results
if selected_fields:
optional_params['selectedFields'] = selected_fields
if page_token:
optional_params['pageToken'] = page_token
if start_index:
optional_params['startIndex'] = start_index
return (self.service.tabledata().list(
projectId=self.project_id,
datasetId=dataset_id,
tableId=table_id,
**optional_params).execute())
def run_table_delete(self, deletion_dataset_table,
ignore_if_missing=False):
"""
Delete an existing table from the dataset;
If the table does not exist, return an error unless ignore_if_missing
is set to True.
:param deletion_dataset_table: A dotted
(<project>.|<project>:)<dataset>.<table> that indicates which table
will be deleted.
:type deletion_dataset_table: str
:param ignore_if_missing: if True, then return success even if the
requested table does not exist.
:type ignore_if_missing: boolean
:return:
"""
deletion_project, deletion_dataset, deletion_table = \
_split_tablename(table_input=deletion_dataset_table,
default_project_id=self.project_id)
try:
self.service.tables() \
.delete(projectId=deletion_project,
datasetId=deletion_dataset,
tableId=deletion_table) \
.execute()
self.log.info('Deleted table %s:%s.%s.', deletion_project,
deletion_dataset, deletion_table)
except HttpError:
if not ignore_if_missing:
raise Exception('Table deletion failed. Table does not exist.')
else:
self.log.info('Table does not exist. Skipping.')
def run_table_upsert(self, dataset_id, table_resource, project_id=None):
"""
creates a new, empty table in the dataset;
If the table already exists, update the existing table.
Since BigQuery does not natively allow table upserts, this is not an
atomic operation.
:param dataset_id: the dataset to upsert the table into.
:type dataset_id: str
:param table_resource: a table resource. see
https://cloud.google.com/bigquery/docs/reference/v2/tables#resource
:type table_resource: dict
:param project_id: the project to upsert the table into. If None,
project will be self.project_id.
:return:
"""
# check to see if the table exists
table_id = table_resource['tableReference']['tableId']
project_id = project_id if project_id is not None else self.project_id
tables_list_resp = self.service.tables().list(
projectId=project_id, datasetId=dataset_id).execute()
while True:
for table in tables_list_resp.get('tables', []):
if table['tableReference']['tableId'] == table_id:
# found the table, do update
self.log.info('Table %s:%s.%s exists, updating.',
project_id, dataset_id, table_id)
return self.service.tables().update(
projectId=project_id,
datasetId=dataset_id,
tableId=table_id,
body=table_resource).execute()
# If there is a next page, we need to check the next page.
if 'nextPageToken' in tables_list_resp:
tables_list_resp = self.service.tables()\
.list(projectId=project_id,
datasetId=dataset_id,
pageToken=tables_list_resp['nextPageToken'])\
.execute()
# If there is no next page, then the table doesn't exist.
else:
# do insert
self.log.info('Table %s:%s.%s does not exist. creating.',
project_id, dataset_id, table_id)
return self.service.tables().insert(
projectId=project_id,
datasetId=dataset_id,
body=table_resource).execute()
def run_grant_dataset_view_access(self,
source_dataset,
view_dataset,
view_table,
source_project=None,
view_project=None):
"""
Grant authorized view access of a dataset to a view table.
If this view has already been granted access to the dataset, do nothing.
This method is not atomic. Running it may clobber a simultaneous update.
:param source_dataset: the source dataset
:type source_dataset: str
:param view_dataset: the dataset that the view is in
:type view_dataset: str
:param view_table: the table of the view
:type view_table: str
:param source_project: the project of the source dataset. If None,
self.project_id will be used.
:type source_project: str
:param view_project: the project that the view is in. If None,
self.project_id will be used.
:type view_project: str
:return: the datasets resource of the source dataset.
"""
# Apply default values to projects
source_project = source_project if source_project else self.project_id
view_project = view_project if view_project else self.project_id
# we don't want to clobber any existing accesses, so we have to get
# info on the dataset before we can add view access
source_dataset_resource = self.service.datasets().get(
projectId=source_project, datasetId=source_dataset).execute()
access = source_dataset_resource[
'access'] if 'access' in source_dataset_resource else []
view_access = {
'view': {
'projectId': view_project,
'datasetId': view_dataset,
'tableId': view_table
}
}
# check to see if the view we want to add already exists.
if view_access not in access:
self.log.info(
'Granting table %s:%s.%s authorized view access to %s:%s dataset.',
view_project, view_dataset, view_table, source_project,
source_dataset)
access.append(view_access)
return self.service.datasets().patch(
projectId=source_project,
datasetId=source_dataset,
body={
'access': access
}).execute()
else:
# if view is already in access, do nothing.
self.log.info(
'Table %s:%s.%s already has authorized view access to %s:%s dataset.',
view_project, view_dataset, view_table, source_project, source_dataset)
return source_dataset_resource
def create_empty_dataset(self, dataset_id="", project_id="",
dataset_reference=None):
"""
Create a new empty dataset:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/insert
:param project_id: The name of the project where we want to create
an empty a dataset. Don't need to provide, if projectId in dataset_reference.
:type project_id: str
:param dataset_id: The id of dataset. Don't need to provide,
if datasetId in dataset_reference.
:type dataset_id: str
:param dataset_reference: Dataset reference that could be provided
with request body. More info:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
:type dataset_reference: dict
"""
if dataset_reference:
_validate_value('dataset_reference', dataset_reference, dict)
else:
dataset_reference = {}
if "datasetReference" not in dataset_reference:
dataset_reference["datasetReference"] = {}
if not dataset_reference["datasetReference"].get("datasetId") and not dataset_id:
raise ValueError(
"{} not provided datasetId. Impossible to create dataset")
dataset_required_params = [(dataset_id, "datasetId", ""),
(project_id, "projectId", self.project_id)]
for param_tuple in dataset_required_params:
param, param_name, param_default = param_tuple
if param_name not in dataset_reference['datasetReference']:
if param_default and not param:
self.log.info("{} was not specified. Will be used default "
"value {}.".format(param_name,
param_default))
param = param_default
dataset_reference['datasetReference'].update(
{param_name: param})
elif param:
_api_resource_configs_duplication_check(
param_name, param,
dataset_reference['datasetReference'], 'dataset_reference')
dataset_id = dataset_reference.get("datasetReference").get("datasetId")
dataset_project_id = dataset_reference.get("datasetReference").get(
"projectId")
self.log.info('Creating Dataset: %s in project: %s ', dataset_id,
dataset_project_id)
try:
self.service.datasets().insert(
projectId=dataset_project_id,
body=dataset_reference).execute()
self.log.info('Dataset created successfully: In project %s '
'Dataset %s', dataset_project_id, dataset_id)
except HttpError as err:
raise AirflowException(
'BigQuery job failed. Error was: {}'.format(err.content)
)
def delete_dataset(self, project_id, dataset_id):
"""
Delete a dataset of Big query in your project.
:param project_id: The name of the project where we have the dataset .
:type project_id: str
:param dataset_id: The dataset to be delete.
:type dataset_id: str
:return:
"""
project_id = project_id if project_id is not None else self.project_id
self.log.info('Deleting from project: %s Dataset:%s',
project_id, dataset_id)
try:
self.service.datasets().delete(
projectId=project_id,
datasetId=dataset_id).execute()
self.log.info('Dataset deleted successfully: In project %s '
'Dataset %s', project_id, dataset_id)
except HttpError as err:
raise AirflowException(
'BigQuery job failed. Error was: {}'.format(err.content)
)
def get_dataset(self, dataset_id, project_id=None):
"""
Method returns dataset_resource if dataset exist
and raised 404 error if dataset does not exist
:param dataset_id: The BigQuery Dataset ID
:type dataset_id: str
:param project_id: The GCP Project ID
:type project_id: str
:return: dataset_resource
.. seealso::
For more information, see Dataset Resource content:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
"""
if not dataset_id or not isinstance(dataset_id, str):
raise ValueError("dataset_id argument must be provided and has "
"a type 'str'. You provided: {}".format(dataset_id))
dataset_project_id = project_id if project_id else self.project_id
try:
dataset_resource = self.service.datasets().get(
datasetId=dataset_id, projectId=dataset_project_id).execute()
self.log.info("Dataset Resource: {}".format(dataset_resource))
except HttpError as err:
raise AirflowException(
'BigQuery job failed. Error was: {}'.format(err.content))
return dataset_resource
def get_datasets_list(self, project_id=None):
"""
Method returns full list of BigQuery datasets in the current project
.. seealso::
For more information, see:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list
:param project_id: Google Cloud Project for which you
try to get all datasets
:type project_id: str
:return: datasets_list
Example of returned datasets_list: ::
{
"kind":"bigquery#dataset",
"location":"US",
"id":"your-project:dataset_2_test",
"datasetReference":{
"projectId":"your-project",
"datasetId":"dataset_2_test"
}
},
{
"kind":"bigquery#dataset",
"location":"US",
"id":"your-project:dataset_1_test",
"datasetReference":{
"projectId":"your-project",
"datasetId":"dataset_1_test"
}
}
]
"""
dataset_project_id = project_id if project_id else self.project_id
try:
datasets_list = self.service.datasets().list(
projectId=dataset_project_id).execute()['datasets']
self.log.info("Datasets List: {}".format(datasets_list))
except HttpError as err:
raise AirflowException(
'BigQuery job failed. Error was: {}'.format(err.content))
return datasets_list
def insert_all(self, project_id, dataset_id, table_id,
rows, ignore_unknown_values=False,
skip_invalid_rows=False, fail_on_error=False):
"""
Method to stream data into BigQuery one record at a time without needing
to run a load job
.. seealso::
For more information, see:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
:param project_id: The name of the project where we have the table
:type project_id: str
:param dataset_id: The name of the dataset where we have the table
:type dataset_id: str
:param table_id: The name of the table
:type table_id: str
:param rows: the rows to insert
:type rows: list
**Example or rows**:
rows=[{"json": {"a_key": "a_value_0"}}, {"json": {"a_key": "a_value_1"}}]
:param ignore_unknown_values: [Optional] Accept rows that contain values
that do not match the schema. The unknown values are ignored.
The default value is false, which treats unknown values as errors.
:type ignore_unknown_values: bool
:param skip_invalid_rows: [Optional] Insert all valid rows of a request,
even if invalid rows exist. The default value is false, which causes
the entire request to fail if any invalid rows exist.
:type skip_invalid_rows: bool
:param fail_on_error: [Optional] Force the task to fail if any errors occur.
The default value is false, which indicates the task should not fail
even if any insertion errors occur.
:type fail_on_error: bool
"""
dataset_project_id = project_id if project_id else self.project_id
body = {
"rows": rows,
"ignoreUnknownValues": ignore_unknown_values,
"kind": "bigquery#tableDataInsertAllRequest",
"skipInvalidRows": skip_invalid_rows,
}
try:
self.log.info('Inserting {} row(s) into Table {}:{}.{}'.format(
len(rows), dataset_project_id,
dataset_id, table_id))
resp = self.service.tabledata().insertAll(
projectId=dataset_project_id, datasetId=dataset_id,
tableId=table_id, body=body
).execute()
if 'insertErrors' not in resp:
self.log.info('All row(s) inserted successfully: {}:{}.{}'.format(
dataset_project_id, dataset_id, table_id))
else:
error_msg = '{} insert error(s) occured: {}:{}.{}. Details: {}'.format(
len(resp['insertErrors']),
dataset_project_id, dataset_id, table_id, resp['insertErrors'])
if fail_on_error:
raise AirflowException(
'BigQuery job failed. Error was: {}'.format(error_msg)
)
self.log.info(error_msg)
except HttpError as err:
raise AirflowException(
'BigQuery job failed. Error was: {}'.format(err.content)
)
class BigQueryCursor(BigQueryBaseCursor):
"""
A very basic BigQuery PEP 249 cursor implementation. The PyHive PEP 249
implementation was used as a reference:
https://github.com/dropbox/PyHive/blob/master/pyhive/presto.py
https://github.com/dropbox/PyHive/blob/master/pyhive/common.py
"""
def __init__(self, service, project_id, use_legacy_sql=True, location=None):
super(BigQueryCursor, self).__init__(
service=service,
project_id=project_id,
use_legacy_sql=use_legacy_sql,
location=location,
)
self.buffersize = None
self.page_token = None
self.job_id = None
self.buffer = []
self.all_pages_loaded = False
@property
def description(self):
""" The schema description method is not currently implemented. """
raise NotImplementedError
def close(self):
""" By default, do nothing """
pass
@property
def rowcount(self):
""" By default, return -1 to indicate that this is not supported. """
return -1
def execute(self, operation, parameters=None):
"""
Executes a BigQuery query, and returns the job ID.
:param operation: The query to execute.
:type operation: string
:param parameters: Parameters to substitute into the query.
:type parameters: dict
"""
sql = _bind_parameters(operation,
parameters) if parameters else operation
self.job_id = self.run_query(sql)
def executemany(self, operation, seq_of_parameters):
"""
Execute a BigQuery query multiple times with different parameters.
:param operation: The query to execute.
:type operation: string
:param seq_of_parameters: List of dictionary parameters to substitute into the
query.
:type seq_of_parameters: list
"""
for parameters in seq_of_parameters:
self.execute(operation, parameters)
def fetchone(self):
""" Fetch the next row of a query result set. """
return self.next()
def next(self):
"""
Helper method for fetchone, which returns the next row from a buffer.
If the buffer is empty, attempts to paginate through the result set for
the next page, and load it into the buffer.
"""
if not self.job_id:
return None
if len(self.buffer) == 0:
if self.all_pages_loaded:
return None
query_results = (self.service.jobs().getQueryResults(
projectId=self.project_id,
jobId=self.job_id,
pageToken=self.page_token).execute())
if 'rows' in query_results and query_results['rows']:
self.page_token = query_results.get('pageToken')
fields = query_results['schema']['fields']
col_types = [field['type'] for field in fields]
rows = query_results['rows']
for dict_row in rows:
typed_row = ([
_bq_cast(vs['v'], col_types[idx])
for idx, vs in enumerate(dict_row['f'])
])
self.buffer.append(typed_row)
if not self.page_token:
self.all_pages_loaded = True
else:
# Reset all state since we've exhausted the results.
self.page_token = None
self.job_id = None
self.page_token = None
return None
return self.buffer.pop(0)
def fetchmany(self, size=None):
"""
Fetch the next set of rows of a query result, returning a sequence of sequences
(e.g. a list of tuples). An empty sequence is returned when no more rows are
available. The number of rows to fetch per call is specified by the parameter.
If it is not given, the cursor's arraysize determines the number of rows to be
fetched. The method should try to fetch as many rows as indicated by the size
parameter. If this is not possible due to the specified number of rows not being
available, fewer rows may be returned. An :py:class:`~pyhive.exc.Error`
(or subclass) exception is raised if the previous call to
:py:meth:`execute` did not produce any result set or no call was issued yet.
"""
if size is None:
size = self.arraysize
result = []
for _ in range(size):
one = self.fetchone()
if one is None:
break
else:
result.append(one)
return result
def fetchall(self):
"""
Fetch all (remaining) rows of a query result, returning them as a sequence of
sequences (e.g. a list of tuples).
"""
result = []
while True:
one = self.fetchone()
if one is None:
break
else:
result.append(one)
return result
def get_arraysize(self):
""" Specifies the number of rows to fetch at a time with .fetchmany() """
return self._buffersize if self.buffersize else 1
def set_arraysize(self, arraysize):
""" Specifies the number of rows to fetch at a time with .fetchmany() """
self.buffersize = arraysize
arraysize = property(get_arraysize, set_arraysize)
def setinputsizes(self, sizes):
""" Does nothing by default """
pass
def setoutputsize(self, size, column=None):
""" Does nothing by default """
pass
def _bind_parameters(operation, parameters):
""" Helper method that binds parameters to a SQL query. """
# inspired by MySQL Python Connector (conversion.py)
string_parameters = {}
for (name, value) in iteritems(parameters):
if value is None:
string_parameters[name] = 'NULL'
elif isinstance(value, basestring):
string_parameters[name] = "'" + _escape(value) + "'"
else:
string_parameters[name] = str(value)
return operation % string_parameters
def _escape(s):
""" Helper method that escapes parameters to a SQL query. """
e = s
e = e.replace('\\', '\\\\')
e = e.replace('\n', '\\n')
e = e.replace('\r', '\\r')
e = e.replace("'", "\\'")
e = e.replace('"', '\\"')
return e
def _bq_cast(string_field, bq_type):
"""
Helper method that casts a BigQuery row to the appropriate data types.
This is useful because BigQuery returns all fields as strings.
"""
if string_field is None:
return None
elif bq_type == 'INTEGER':
return int(string_field)
elif bq_type == 'FLOAT' or bq_type == 'TIMESTAMP':
return float(string_field)
elif bq_type == 'BOOLEAN':
if string_field not in ['true', 'false']:
raise ValueError("{} must have value 'true' or 'false'".format(
string_field))
return string_field == 'true'
else:
return string_field
def _split_tablename(table_input, default_project_id, var_name=None):
if '.' not in table_input:
raise ValueError(
'Expected target table name in the format of '
'<dataset>.<table>. Got: {}'.format(table_input))
if not default_project_id:
raise ValueError("INTERNAL: No default project is specified")
def var_print(var_name):
if var_name is None:
return ""
else:
return "Format exception for {var}: ".format(var=var_name)
if table_input.count('.') + table_input.count(':') > 3:
raise Exception(('{var}Use either : or . to specify project '
'got {input}').format(
var=var_print(var_name), input=table_input))
cmpt = table_input.rsplit(':', 1)
project_id = None
rest = table_input
if len(cmpt) == 1:
project_id = None
rest = cmpt[0]
elif len(cmpt) == 2 and cmpt[0].count(':') <= 1:
if cmpt[-1].count('.') != 2:
project_id = cmpt[0]
rest = cmpt[1]
else:
raise Exception(('{var}Expect format of (<project:)<dataset>.<table>, '
'got {input}').format(
var=var_print(var_name), input=table_input))
cmpt = rest.split('.')
if len(cmpt) == 3:
if project_id:
raise ValueError(
"{var}Use either : or . to specify project".format(
var=var_print(var_name)))
project_id = cmpt[0]
dataset_id = cmpt[1]
table_id = cmpt[2]
elif len(cmpt) == 2:
dataset_id = cmpt[0]
table_id = cmpt[1]
else:
raise Exception(
('{var}Expect format of (<project.|<project:)<dataset>.<table>, '
'got {input}').format(var=var_print(var_name), input=table_input))
if project_id is None:
if var_name is not None:
log = LoggingMixin().log
log.info('Project not included in {var}: {input}; '
'using project "{project}"'.format(
var=var_name,
input=table_input,
project=default_project_id))
project_id = default_project_id
return project_id, dataset_id, table_id
def _cleanse_time_partitioning(destination_dataset_table, time_partitioning_in):
# if it is a partitioned table ($ is in the table name) add partition load option
if time_partitioning_in is None:
time_partitioning_in = {}
time_partitioning_out = {}
if destination_dataset_table and '$' in destination_dataset_table:
time_partitioning_out['type'] = 'DAY'
time_partitioning_out.update(time_partitioning_in)
return time_partitioning_out
def _validate_value(key, value, expected_type):
""" function to check expected type and raise
error if type is not correct """
if not isinstance(value, expected_type):
raise TypeError("{} argument must have a type {} not {}".format(
key, expected_type, type(value)))
def _api_resource_configs_duplication_check(key, value, config_dict,
config_dict_name='api_resource_configs'):
if key in config_dict and value != config_dict[key]:
raise ValueError("Values of {param_name} param are duplicated. "
"{dict_name} contained {param_name} param "
"in `query` config and {param_name} was also provided "
"with arg to run_query() method. Please remove duplicates."
.format(param_name=key, dict_name=config_dict_name))