Source code for airflow.providers.amazon.aws.hooks.appflow
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportjsonfromdatetimeimportdatetime,timezonefromtimeimportsleepfromtypingimportTYPE_CHECKINGfromairflow.compat.functoolsimportcached_propertyfromairflow.providers.amazon.aws.hooks.base_awsimportAwsBaseHookifTYPE_CHECKING:frommypy_boto3_appflow.clientimportAppflowClientfrommypy_boto3_appflow.type_defsimportTaskTypeDef
[docs]classAppflowHook(AwsBaseHook):""" Interact with Amazon Appflow, using the boto3 library. Hook attribute ``conn`` has all methods that listed in documentation. .. seealso:: - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/appflow.html - https://docs.aws.amazon.com/appflow/1.0/APIReference/Welcome.html Additional arguments (such as ``aws_conn_id`` or ``region_name``) may be specified and are passed down to the underlying AwsBaseHook. .. seealso:: :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` """
[docs]defconn(self)->AppflowClient:"""Get the underlying boto3 Appflow client (cached)"""returnsuper().conn
[docs]defrun_flow(self,flow_name:str,poll_interval:int=20)->str:""" Execute an AppFlow run. :param flow_name: The flow name :param poll_interval: Time (seconds) to wait between two consecutive calls to check the run status :return: The run execution ID """ts_before:datetime=datetime.now(timezone.utc)sleep(self.EVENTUAL_CONSISTENCY_OFFSET)response_start=self.conn.start_flow(flowName=flow_name)execution_id=response_start["executionId"]self.log.info("executionId: %s",execution_id)response_desc=self.conn.describe_flow(flowName=flow_name)last_exec_details=response_desc["lastRunExecutionDetails"]# Wait Appflow eventual consistenceself.log.info("Waiting for Appflow eventual consistence...")while(response_desc.get("lastRunExecutionDetails",{}).get("mostRecentExecutionTime",datetime(1970,1,1,tzinfo=timezone.utc))<ts_before):sleep(self.EVENTUAL_CONSISTENCY_POLLING)response_desc=self.conn.describe_flow(flowName=flow_name)last_exec_details=response_desc["lastRunExecutionDetails"]# Wait flow stopsself.log.info("Waiting for flow run...")while("mostRecentExecutionStatus"notinlast_exec_detailsorlast_exec_details["mostRecentExecutionStatus"]=="InProgress"):sleep(poll_interval)response_desc=self.conn.describe_flow(flowName=flow_name)last_exec_details=response_desc["lastRunExecutionDetails"]self.log.info("lastRunExecutionDetails: %s",last_exec_details)iflast_exec_details["mostRecentExecutionStatus"]=="Error":raiseException(f"Flow error:\n{json.dumps(response_desc,default=str)}")returnexecution_id
[docs]defupdate_flow_filter(self,flow_name:str,filter_tasks:list[TaskTypeDef],set_trigger_ondemand:bool=False)->None:""" Update the flow task filter. All filters will be removed if an empty array is passed to filter_tasks. :param flow_name: The flow name :param filter_tasks: List flow tasks to be added :param set_trigger_ondemand: If True, set the trigger to on-demand; otherwise, keep the trigger as is :return: None """response=self.conn.describe_flow(flowName=flow_name)connector_type=response["sourceFlowConfig"]["connectorType"]tasks:list[TaskTypeDef]=[]# cleanup old filter tasksfortaskinresponse["tasks"]:if(task["taskType"]=="Filter"andtask.get("connectorOperator",{}).get(connector_type)!="PROJECTION"):self.log.info("Removing task: %s",task)else:tasks.append(task)# List of non-filter taskstasks+=filter_tasks# Add the new filter tasksifset_trigger_ondemand:# Clean up attribute to force on-demand triggerdelresponse["triggerConfig"]["triggerProperties"]self.conn.update_flow(flowName=response["flowName"],destinationFlowConfigList=response["destinationFlowConfigList"],sourceFlowConfig=response["sourceFlowConfig"],triggerConfig=response["triggerConfig"],description=response.get("description","Flow description."),tasks=tasks,