Source code for airflow.providers.facebook.ads.hooks.ads
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains Facebook Ads Reporting hooks."""from__future__importannotationsimporttimefromenumimportEnumfromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,Anyfromfacebook_business.adobjects.adaccountimportAdAccountfromfacebook_business.adobjects.adreportrunimportAdReportRunfromfacebook_business.apiimportFacebookAdsApifromairflow.exceptionsimportAirflowExceptionfromairflow.hooks.baseimportBaseHookifTYPE_CHECKING:fromfacebook_business.adobjects.adsinsightsimportAdsInsights
[docs]classJobStatus(Enum):"""Available options for facebook async task status."""
[docs]classFacebookAdsReportingHook(BaseHook):""" Facebook Ads API. .. seealso:: For more information on the Facebook Ads API, take a look at the API docs: https://developers.facebook.com/docs/marketing-apis/ :param facebook_conn_id: Airflow Facebook Ads connection ID :param api_version: The version of Facebook API. Default to None. If it is None, it will use the Facebook business SDK default version. """
def__init__(self,facebook_conn_id:str=default_conn_name,api_version:str|None=None,)->None:super().__init__()self.facebook_conn_id=facebook_conn_idself.api_version=api_versionself.client_required_fields=["app_id","app_secret","access_token","account_id"]def_get_service(self)->FacebookAdsApi:"""Return Facebook Ads Client using a service account."""config=self.facebook_ads_configreturnFacebookAdsApi.init(app_id=config["app_id"],app_secret=config["app_secret"],access_token=config["access_token"],api_version=self.api_version,)@cached_property
[docs]defmultiple_accounts(self)->bool:"""Check whether provided account_id in the Facebook Ads Connection is provided as a list."""returnisinstance(self.facebook_ads_config["account_id"],list)
@cached_property
[docs]deffacebook_ads_config(self)->dict:""" Get the ``facebook_ads_config`` attribute. This fetches Facebook Ads connection from meta database, and sets the ``facebook_ads_config`` attribute with returned config file. """self.log.info("Fetching fb connection: %s",self.facebook_conn_id)conn=self.get_connection(self.facebook_conn_id)config=conn.extra_dejsonmissing_keys=self.client_required_fields-config.keys()ifmissing_keys:message=f"{missing_keys} fields are missing"raiseAirflowException(message)returnconfig
[docs]defbulk_facebook_report(self,params:dict[str,Any]|None,fields:list[str],sleep_time:int=5,)->list[AdsInsights]|dict[str,list[AdsInsights]]:""" Pull data from Facebook Ads API regarding Account ID with matching return type. The return type and value depends on the ``account_id`` configuration. If the configuration is a str representing a single Account ID, the return value is the list of reports for that ID. If the configuration is a list of str representing multiple Account IDs, the return value is a dict of Account IDs and their respective list of reports. :param fields: List of fields that is obtained from Facebook. Found in AdsInsights.Field class. https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 :param params: Parameters that determine the query for Facebook https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 :param sleep_time: Time to sleep when async call is happening :return: Facebook Ads API response, converted to Facebook Ads Row objects regarding given Account ID type """api=self._get_service()ifself.multiple_accounts:all_insights={}foraccount_idinself.facebook_ads_config["account_id"]:all_insights[account_id]=self._facebook_report(account_id=account_id,api=api,params=params,fields=fields,sleep_time=sleep_time)self.log.info("%s Account Id used to extract data from Facebook Ads Iterators successfully",account_id)returnall_insightselse:returnself._facebook_report(account_id=self.facebook_ads_config["account_id"],api=api,params=params,fields=fields,sleep_time=sleep_time,)
def_facebook_report(self,account_id:str,api:FacebookAdsApi,params:dict[str,Any]|None,fields:list[str],sleep_time:int=5,)->list[AdsInsights]:""" Pull data from the Facebook Ads API with given ``account_id``. :param account_id: Facebook Account ID that holds ads information https://developers.facebook.com/docs/marketing-api/reference/ads-insights/ :param api: FacebookAdsApi created in the hook :param fields: List of fields that is obtained from Facebook. Found in AdsInsights.Field class. https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 :param params: Parameters that determine the query for Facebook https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 :param sleep_time: Time to sleep when async call is happening """ad_account=AdAccount(account_id,api=api)_async=ad_account.get_insights(params=params,fields=fields,is_async=True)whileTrue:request=_async.api_get()async_status=request[AdReportRun.Field.async_status]percent=request[AdReportRun.Field.async_percent_completion]self.log.info("%s%s completed, async_status: %s",percent,"%",async_status)ifasync_status==JobStatus.COMPLETED.value:self.log.info("Job run completed")breakifasync_statusin[JobStatus.SKIPPED.value,JobStatus.FAILED.value]:message=f"{async_status}. Please retry."raiseAirflowException(message)time.sleep(sleep_time)report_run_id=_async.api_get()["report_run_id"]report_object=AdReportRun(report_run_id,api=api)self.log.info("Extracting data from returned Facebook Ads Iterators")insights=report_object.get_insights()returnlist(insights)