Source code for airflow.providers.google.marketing_platform.hooks.search_ads

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains Google Search Ads 360 hook."""

from __future__ import annotations

from collections.abc import Sequence
from functools import cached_property
from typing import TYPE_CHECKING, Any

from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

from airflow.providers.common.compat.sdk import AirflowException
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook

if TYPE_CHECKING:
    from googleapiclient.discovery import Resource


[docs] class GoogleSearchAdsReportingHook(GoogleBaseHook): """Hook for the Google Search Ads 360 Reporting API.""" _conn: build | None = None
[docs] default_api_version: str = "v0"
def __init__( self, api_version: str | None = None, gcp_conn_id: str = "google_search_ads_default", **kwargs, ) -> None: super().__init__(gcp_conn_id=gcp_conn_id, **kwargs)
[docs] self.api_version = api_version or self.default_api_version
def _get_config(self) -> None: """ Set up Google Search Ads config from Connection. This pulls the connections from db, and uses it to set up ``google_search_ads_client``. """ conn = self.get_connection(self.gcp_conn_id) if "google_search_ads_client" not in conn.extra_dejson: raise AirflowException("google_search_ads_client not found in extra field") self.google_search_ads_config = conn.extra_dejson["google_search_ads_client"]
[docs] def get_credentials(self) -> Credentials: """Return the credential instance for search ads.""" self._get_config() self.logger().info(f"Credential configuration: {self.google_search_ads_config}") return Credentials(**self.google_search_ads_config)
[docs] def get_conn(self) -> Resource: if not self._conn: creds = self.get_credentials() self._conn = build( "searchads360", self.api_version, credentials=creds, cache_discovery=False, ) return self._conn
@cached_property
[docs] def customer_service(self): return self.get_conn().customers()
@cached_property
[docs] def fields_service(self): return self.get_conn().searchAds360Fields()
[docs] def search( self, customer_id: str, query: str, page_token: str | None = None, page_size: int = 10000, return_total_results_count: bool = False, summary_row_setting: str | None = None, validate_only: bool = False, ): """ Search and download the report. Use pagination to download entire report. :param customer_id: The ID of the customer being queried. :param query: The query to execute. :param page_token: Token of the page to retrieve. If not specified, the first page of results will be returned. Use the value obtained from `next_page_token` in the previous response in order to request the next page of results. :param page_size: Number of elements to retrieve in a single page. When too large a page is requested, the server may decide to further limit the number of returned resources. Default is 10000. :param return_total_results_count: If true, the total number of results that match the query ignoring the LIMIT clause will be included in the response. Default is false. :param summary_row_setting: Determines whether a summary row will be returned. By default, summary row is not returned. If requested, the summary row will be sent in a response by itself after all others query results are returned. :param validate_only: If true, the request is validated but not executed. Default is false. """ params: dict[str, Any] = { "query": query, "pageSize": page_size, "returnTotalResultsCount": return_total_results_count, "validateOnly": validate_only, } if page_token is not None: params.update({"pageToken": page_token}) if summary_row_setting is not None: params.update({"summaryRowSetting": summary_row_setting}) response = ( self.customer_service.searchAds360() .search(customerId=customer_id, body=params) .execute(num_retries=self.num_retries) ) self.log.info("Search response: %s", response) return response
[docs] def get_custom_column(self, customer_id: str, custom_column_id: str): """ Retrieve the requested custom column in full detail. :param customer_id: The customer id :param custom_column_id: The custom column id """ resource_name = f"customers/{customer_id}/customColumns/{custom_column_id}" response = ( self.customer_service.customColumns() .get(resourceName=resource_name) .execute(num_retries=self.num_retries) ) self.log.info("Retrieved custom column: %s", response) return response
[docs] def list_custom_columns(self, customer_id: str): """ Retrieve all the custom columns associated with the customer in full detail. :param customer_id: The customer id """ response = ( self.customer_service.customColumns() .list(customerId=customer_id) .execute(num_retries=self.num_retries) ) self.log.info("Listing the custom columns: %s", response) return response
[docs] def get_field(self, field_name: str): """ Retrieve the requested field details. :param field_name: The name of the field. """ resource_name = f"searchAds360Fields/{field_name}" response = self.fields_service.get(resourceName=resource_name).execute(num_retries=self.num_retries) self.log.info("Retrieved field: %s", response) return response
[docs] def search_fields(self, query: str, page_token: str | None = None, page_size: int | None = 10000): """ Retrieve all the fields that match with the given search. :param query: The query string to execute. :param page_token: Token of the page to retrieve. If not specified, the first page of results will be returned. Use the value obtained from `next_page_token` in the previous response in order to request the next page of results. :param page_size: Number of elements to retrieve in a single page. When too large a page is requested, the server may decide to further limit the number of returned resources. Default 10000. """ params: dict[str, Any] = { "query": query, "pageSize": page_size, } if page_token is not None: params.update({"pageToken": page_token}) response = self.fields_service.search(body=params).execute(num_retries=self.num_retries) self.log.info("Retrieved fields: %s", response) return response
[docs] class GoogleSearchAdsHook(GoogleBaseHook): """Hook for Google Search Ads 360.""" _conn: build | None = None def __init__( self, api_version: str = "v2", gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: super().__init__( gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain, **kwargs, )
[docs] self.api_version = api_version
[docs] def get_conn(self): """Retrieve connection to Google SearchAds.""" if not self._conn: http_authorized = self._authorize() self._conn = build( "doubleclicksearch", self.api_version, http=http_authorized, cache_discovery=False, ) return self._conn

Was this entry helpful?