Source code for airflow.providers.amazon.aws.operators.appflow
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimporttimeimportwarningsfromdatetimeimportdatetime,timedeltafromtypingimportTYPE_CHECKING,castfromairflow.exceptionsimportAirflowException,AirflowProviderDeprecationWarningfromairflow.operators.pythonimportShortCircuitOperatorfromairflow.providers.amazon.aws.hooks.appflowimportAppflowHookfromairflow.providers.amazon.aws.operators.base_awsimportAwsBaseOperatorfromairflow.providers.amazon.aws.utilsimportdatetime_to_epoch_msfromairflow.providers.amazon.aws.utils.mixinsimportAwsBaseHookMixin,AwsHookParams,aws_template_fieldsifTYPE_CHECKING:frommypy_boto3_appflow.type_defsimport(DescribeFlowExecutionRecordsResponseTypeDef,ExecutionRecordTypeDef,TaskTypeDef,)fromairflow.utils.contextimportContext
[docs]MANDATORY_FILTER_DATE_MSG="The filter_date argument is mandatory for {entity}!"
[docs]NOT_SUPPORTED_SOURCE_MSG="Source {source} is not supported for {entity}!"
[docs]classAppflowBaseOperator(AwsBaseOperator[AppflowHook]):""" Amazon AppFlow Base Operator class (not supposed to be used directly in DAGs). :param source: The source name (Supported: salesforce, zendesk) :param flow_name: The flow name :param flow_update: A boolean to enable/disable a flow update before the run :param source_field: The field name to apply filters :param filter_date: The date value (or template) to be used in filters. :param poll_interval: how often in seconds to check the query status :param max_attempts: how many times to check for status before timing out :param wait_for_completion: whether to wait for the run to end to return :param aws_conn_id: The Airflow connection used for AWS credentials. If this is ``None`` or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. :param verify: Whether or not to verify SSL certificates. See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html :param botocore_config: Configuration dictionary (key-values) for botocore client. See: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html """
def__init__(self,flow_name:str,flow_update:bool,source:str|None=None,source_field:str|None=None,filter_date:str|None=None,poll_interval:int=20,max_attempts:int=60,wait_for_completion:bool=True,**kwargs,)->None:super().__init__(**kwargs)ifsourceisnotNoneandsourcenotinSUPPORTED_SOURCES:raiseValueError(f"{source} is not a supported source (options: {SUPPORTED_SOURCES})!")self.filter_date=filter_dateself.flow_name=flow_nameself.source=sourceself.source_field=source_fieldself.poll_interval=poll_intervalself.max_attempts=max_attemptsself.flow_update=flow_updateself.wait_for_completion=wait_for_completion
[docs]defexecute(self,context:Context)->None:self.filter_date_parsed:datetime|None=(datetime.fromisoformat(self.filter_date)ifself.filter_dateelseNone)ifself.sourceisnotNone:self.connector_type=self._get_connector_type()ifself.flow_update:self._update_flow()# while schedule flows will pick up the update right away, on-demand flows might use out of date# info if triggered right after an update, so we need to wait a bit for the DB to be consistent.time.sleep(AppflowBaseOperator.UPDATE_PROPAGATION_TIME)self._run_flow(context)
def_get_connector_type(self)->str:response=self.hook.conn.describe_flow(flowName=self.flow_name)connector_type=response["sourceFlowConfig"]["connectorType"]ifself.source!=connector_type.lower():raiseValueError(f"Incompatible source ({self.source} and connector type ({connector_type})!")returnconnector_typedef_update_flow(self)->None:self.hook.update_flow_filter(flow_name=self.flow_name,filter_tasks=[],set_trigger_ondemand=True)def_run_flow(self,context)->str:execution_id=self.hook.run_flow(flow_name=self.flow_name,poll_interval=self.poll_interval,max_attempts=self.max_attempts,wait_for_completion=self.wait_for_completion,)task_instance=context["task_instance"]task_instance.xcom_push("execution_id",execution_id)returnexecution_id
[docs]classAppflowRunOperator(AppflowBaseOperator):""" Execute an AppFlow run as is. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AppflowRunOperator` :param source: Obsolete, unnecessary for this operator :param flow_name: The flow name :param poll_interval: how often in seconds to check the query status :param aws_conn_id: The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). :param region: aws region to use :param wait_for_completion: whether to wait for the run to end to return """def__init__(self,flow_name:str,source:str|None=None,poll_interval:int=20,wait_for_completion:bool=True,**kwargs,)->None:ifsourceisnotNone:warnings.warn("The `source` parameter is unused when simply running a flow, please remove it.",AirflowProviderDeprecationWarning,stacklevel=2,)super().__init__(flow_name=flow_name,flow_update=False,source_field=None,filter_date=None,poll_interval=poll_interval,wait_for_completion=wait_for_completion,**kwargs,)
[docs]classAppflowRunFullOperator(AppflowBaseOperator):""" Execute an AppFlow full run removing any filter. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AppflowRunFullOperator` :param source: The source name (Supported: salesforce, zendesk) :param flow_name: The flow name :param poll_interval: how often in seconds to check the query status :param wait_for_completion: whether to wait for the run to end to return """def__init__(self,source:str,flow_name:str,poll_interval:int=20,wait_for_completion:bool=True,**kwargs,)->None:ifsourcenotin{"salesforce","zendesk"}:raiseValueError(NOT_SUPPORTED_SOURCE_MSG.format(source=source,entity="AppflowRunFullOperator"))super().__init__(source=source,flow_name=flow_name,flow_update=True,source_field=None,filter_date=None,poll_interval=poll_interval,wait_for_completion=wait_for_completion,**kwargs,)
[docs]classAppflowRunBeforeOperator(AppflowBaseOperator):""" Execute an AppFlow run after updating the filters to select only previous data. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AppflowRunBeforeOperator` :param source: The source name (Supported: salesforce) :param flow_name: The flow name :param source_field: The field name to apply filters :param filter_date: The date value (or template) to be used in filters. :param poll_interval: how often in seconds to check the query status :param aws_conn_id: The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). :param region: aws region to use :param wait_for_completion: whether to wait for the run to end to return """def__init__(self,source:str,flow_name:str,source_field:str,filter_date:str,poll_interval:int=20,wait_for_completion:bool=True,**kwargs,)->None:ifnotfilter_date:raiseValueError(MANDATORY_FILTER_DATE_MSG.format(entity="AppflowRunBeforeOperator"))ifsource!="salesforce":raiseValueError(NOT_SUPPORTED_SOURCE_MSG.format(source=source,entity="AppflowRunBeforeOperator"))super().__init__(source=source,flow_name=flow_name,flow_update=True,source_field=source_field,filter_date=filter_date,poll_interval=poll_interval,wait_for_completion=wait_for_completion,**kwargs,)def_update_flow(self)->None:ifnotself.filter_date_parsed:raiseValueError(f"Invalid filter_date argument parser value: {self.filter_date_parsed}")ifnotself.source_field:raiseValueError(f"Invalid source_field argument value: {self.source_field}")filter_task:TaskTypeDef={"taskType":"Filter","connectorOperator":{self.connector_type:"LESS_THAN"},# type: ignore"sourceFields":[self.source_field],"taskProperties":{"DATA_TYPE":"datetime","VALUE":str(datetime_to_epoch_ms(self.filter_date_parsed)),},# NOT inclusive}self.hook.update_flow_filter(flow_name=self.flow_name,filter_tasks=[filter_task],set_trigger_ondemand=True)
[docs]classAppflowRunAfterOperator(AppflowBaseOperator):""" Execute an AppFlow run after updating the filters to select only future data. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AppflowRunAfterOperator` :param source: The source name (Supported: salesforce, zendesk) :param flow_name: The flow name :param source_field: The field name to apply filters :param filter_date: The date value (or template) to be used in filters. :param poll_interval: how often in seconds to check the query status :param wait_for_completion: whether to wait for the run to end to return """def__init__(self,source:str,flow_name:str,source_field:str,filter_date:str,poll_interval:int=20,wait_for_completion:bool=True,**kwargs,)->None:ifnotfilter_date:raiseValueError(MANDATORY_FILTER_DATE_MSG.format(entity="AppflowRunAfterOperator"))ifsourcenotin{"salesforce","zendesk"}:raiseValueError(NOT_SUPPORTED_SOURCE_MSG.format(source=source,entity="AppflowRunAfterOperator"))super().__init__(source=source,flow_name=flow_name,flow_update=True,source_field=source_field,filter_date=filter_date,poll_interval=poll_interval,wait_for_completion=wait_for_completion,**kwargs,)def_update_flow(self)->None:ifnotself.filter_date_parsed:raiseValueError(f"Invalid filter_date argument parser value: {self.filter_date_parsed}")ifnotself.source_field:raiseValueError(f"Invalid source_field argument value: {self.source_field}")filter_task:TaskTypeDef={"taskType":"Filter","connectorOperator":{self.connector_type:"GREATER_THAN"},# type: ignore"sourceFields":[self.source_field],"taskProperties":{"DATA_TYPE":"datetime","VALUE":str(datetime_to_epoch_ms(self.filter_date_parsed)),},# NOT inclusive}self.hook.update_flow_filter(flow_name=self.flow_name,filter_tasks=[filter_task],set_trigger_ondemand=True)
[docs]classAppflowRunDailyOperator(AppflowBaseOperator):""" Execute an AppFlow run after updating the filters to select only a single day. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AppflowRunDailyOperator` :param source: The source name (Supported: salesforce) :param flow_name: The flow name :param source_field: The field name to apply filters :param filter_date: The date value (or template) to be used in filters. :param poll_interval: how often in seconds to check the query status :param wait_for_completion: whether to wait for the run to end to return """def__init__(self,source:str,flow_name:str,source_field:str,filter_date:str,poll_interval:int=20,wait_for_completion:bool=True,**kwargs,)->None:ifnotfilter_date:raiseValueError(MANDATORY_FILTER_DATE_MSG.format(entity="AppflowRunDailyOperator"))ifsource!="salesforce":raiseValueError(NOT_SUPPORTED_SOURCE_MSG.format(source=source,entity="AppflowRunDailyOperator"))super().__init__(source=source,flow_name=flow_name,flow_update=True,source_field=source_field,filter_date=filter_date,poll_interval=poll_interval,wait_for_completion=wait_for_completion,**kwargs,)def_update_flow(self)->None:ifnotself.filter_date_parsed:raiseValueError(f"Invalid filter_date argument parser value: {self.filter_date_parsed}")ifnotself.source_field:raiseValueError(f"Invalid source_field argument value: {self.source_field}")start_filter_date=self.filter_date_parsed-timedelta(milliseconds=1)end_filter_date=self.filter_date_parsed+timedelta(days=1)filter_task:TaskTypeDef={"taskType":"Filter","connectorOperator":{self.connector_type:"BETWEEN"},# type: ignore"sourceFields":[self.source_field],"taskProperties":{"DATA_TYPE":"datetime","LOWER_BOUND":str(datetime_to_epoch_ms(start_filter_date)),# NOT inclusive"UPPER_BOUND":str(datetime_to_epoch_ms(end_filter_date)),# NOT inclusive},}self.hook.update_flow_filter(flow_name=self.flow_name,filter_tasks=[filter_task],set_trigger_ondemand=True)
[docs]classAppflowRecordsShortCircuitOperator(ShortCircuitOperator,AwsBaseHookMixin[AppflowHook]):""" Short-circuit in case of an empty AppFlow's run. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AppflowRecordsShortCircuitOperator` :param flow_name: The flow name :param appflow_run_task_id: Run task ID from where this operator should extract the execution ID :param ignore_downstream_trigger_rules: Ignore downstream trigger rules :param aws_conn_id: The Airflow connection used for AWS credentials. If this is ``None`` or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. :param verify: Whether or not to verify SSL certificates. See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html :param botocore_config: Configuration dictionary (key-values) for botocore client. See: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html """