Source code for airflow.providers.google.marketing_platform.hooks.bid_manager
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains Google Bid Manager API hook."""
from __future__ import annotations
from collections.abc import Sequence
from typing import Any
from googleapiclient.discovery import Resource, build
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
[docs]
class GoogleBidManagerHook(GoogleBaseHook):
"""Hook for Google Bid Manager API."""
_conn: Resource | None = None
def __init__(
self,
api_version: str = "v2",
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(
gcp_conn_id=gcp_conn_id,
impersonation_chain=impersonation_chain,
**kwargs,
)
[docs]
self.api_version = api_version
[docs]
def get_conn(self) -> Resource:
"""Retrieve connection to Bid Manager API."""
if not self._conn:
http_authorized = self._authorize()
self._conn = build(
"doubleclickbidmanager",
self.api_version,
http=http_authorized,
cache_discovery=False,
client_options=self.get_client_options(),
)
return self._conn
[docs]
def create_query(self, query: dict[str, Any]) -> dict:
"""
Create a query.
:param query: Query object to be passed to request body.
"""
response = self.get_conn().queries().create(body=query).execute(num_retries=self.num_retries)
return response
[docs]
def delete_query(self, query_id: str) -> None:
"""
Delete a stored query as well as the associated stored reports.
:param query_id: Query ID to delete.
"""
self.get_conn().queries().delete(queryId=query_id).execute(num_retries=self.num_retries)
[docs]
def get_query(self, query_id: str) -> dict:
"""
Retrieve a stored query.
:param query_id: Query ID to retrieve.
"""
response = self.get_conn().queries().get(queryId=query_id).execute(num_retries=self.num_retries)
return response
[docs]
def list_queries(self) -> list[dict]:
"""Retrieve stored queries."""
response = self.get_conn().queries().list().execute(num_retries=self.num_retries)
return response.get("queries", [])
[docs]
def run_query(self, query_id: str, params: dict[str, Any] | None) -> dict:
"""
Run a stored query to generate a report.
:param query_id: Query ID to run.
:param params: Parameters for the report.
"""
return (
self.get_conn().queries().run(queryId=query_id, body=params).execute(num_retries=self.num_retries)
)
[docs]
def get_report(self, query_id: str, report_id: str) -> dict:
"""
Retrieve a report.
:param query_id: Query ID for which report was generated.
:param report_id: Report ID to retrieve.
"""
return (
self.get_conn()
.queries()
.reports()
.get(queryId=query_id, reportId=report_id)
.execute(num_retries=self.num_retries)
)
[docs]
def list_reports(self, query_id: str) -> dict:
"""
Retrieve a list of reports.
:param query_id: Query ID for which report was generated.
"""
return (
self.get_conn().queries().reports().list(queryId=query_id).execute(num_retries=self.num_retries)
)