Source code for airflow.providers.amazon.aws.hooks.appflow
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKINGfromairflow.providers.amazon.aws.hooks.base_awsimportAwsBaseHookifTYPE_CHECKING:frommypy_boto3_appflow.clientimportAppflowClientfrommypy_boto3_appflow.type_defsimportTaskTypeDef
[docs]classAppflowHook(AwsBaseHook):""" Interact with Amazon Appflow. Provide thin wrapper around :external+boto3:py:class:`boto3.client("appflow") <Appflow.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and are passed down to the underlying AwsBaseHook. .. seealso:: - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` - `Amazon Appflow API Reference <https://docs.aws.amazon.com/appflow/1.0/APIReference/Welcome.html>`__ """def__init__(self,*args,**kwargs)->None:kwargs["client_type"]="appflow"super().__init__(*args,**kwargs)@cached_property
[docs]defconn(self)->AppflowClient:"""Get the underlying boto3 Appflow client (cached)."""returnsuper().conn
[docs]defrun_flow(self,flow_name:str,poll_interval:int=20,wait_for_completion:bool=True)->str:""" Execute an AppFlow run. :param flow_name: The flow name :param poll_interval: Time (seconds) to wait between two consecutive calls to check the run status :param wait_for_completion: whether to wait for the run to end to return :return: The run execution ID """response_start=self.conn.start_flow(flowName=flow_name)execution_id=response_start["executionId"]self.log.info("executionId: %s",execution_id)ifwait_for_completion:self.get_waiter("run_complete",{"EXECUTION_ID":execution_id}).wait(flowName=flow_name,WaiterConfig={"Delay":poll_interval},)self._log_execution_description(flow_name,execution_id)returnexecution_id
[docs]defupdate_flow_filter(self,flow_name:str,filter_tasks:list[TaskTypeDef],set_trigger_ondemand:bool=False)->None:""" Update the flow task filter; all filters will be removed if an empty array is passed to filter_tasks. :param flow_name: The flow name :param filter_tasks: List flow tasks to be added :param set_trigger_ondemand: If True, set the trigger to on-demand; otherwise, keep the trigger as is :return: None """response=self.conn.describe_flow(flowName=flow_name)connector_type=response["sourceFlowConfig"]["connectorType"]tasks:list[TaskTypeDef]=[]# cleanup old filter tasksfortaskinresponse["tasks"]:if(task["taskType"]=="Filter"andtask.get("connectorOperator",{}).get(connector_type)!="PROJECTION"):self.log.info("Removing task: %s",task)else:tasks.append(task)# List of non-filter taskstasks+=filter_tasks# Add the new filter tasksifset_trigger_ondemand:# Clean up attribute to force on-demand triggerdelresponse["triggerConfig"]["triggerProperties"]self.conn.update_flow(flowName=response["flowName"],destinationFlowConfigList=response["destinationFlowConfigList"],sourceFlowConfig=response["sourceFlowConfig"],triggerConfig=response["triggerConfig"],description=response.get("description","Flow description."),tasks=tasks,)