Source code for airflow.providers.amazon.aws.hooks.datasync
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Interact with AWS DataSync, using the AWS ``boto3`` library."""from__future__importannotationsimporttimefromurllib.parseimporturlsplitfromairflow.exceptionsimportAirflowBadRequest,AirflowException,AirflowTaskTimeoutfromairflow.providers.amazon.aws.hooks.base_awsimportAwsBaseHook
[docs]classDataSyncHook(AwsBaseHook):""" Interact with AWS DataSync. Provide thick wrapper around :external+boto3:py:class:`boto3.client("datasync") <DataSync.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and are passed down to the underlying AwsBaseHook. :param wait_interval_seconds: Time to wait between two consecutive calls to check TaskExecution status. Defaults to 30 seconds. :raises ValueError: If wait_interval_seconds is not between 0 and 15*60 seconds. .. seealso:: - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` """
def__init__(self,wait_interval_seconds:int=30,*args,**kwargs)->None:super().__init__(client_type="datasync",*args,**kwargs)# type: ignore[misc]self.locations:list=[]self.tasks:list=[]# wait_interval_seconds = 0 is used during unit testsifwait_interval_seconds<0orwait_interval_seconds>15*60:raiseValueError(f"Invalid wait_interval_seconds {wait_interval_seconds}")self.wait_interval_seconds=wait_interval_seconds
[docs]defcreate_location(self,location_uri:str,**create_location_kwargs)->str:""" Creates a new location. .. seealso:: - :external+boto3:py:meth:`DataSync.Client.create_location_s3` - :external+boto3:py:meth:`DataSync.Client.create_location_smb` - :external+boto3:py:meth:`DataSync.Client.create_location_nfs` - :external+boto3:py:meth:`DataSync.Client.create_location_efs` :param location_uri: Location URI used to determine the location type (S3, SMB, NFS, EFS). :param create_location_kwargs: Passed to ``DataSync.Client.create_location_*`` methods. :return: LocationArn of the created Location. :raises AirflowException: If location type (prefix from ``location_uri``) is invalid. """schema=urlsplit(location_uri).schemeifschema=="smb":location=self.get_conn().create_location_smb(**create_location_kwargs)elifschema=="s3":location=self.get_conn().create_location_s3(**create_location_kwargs)elifschema=="nfs":location=self.get_conn().create_location_nfs(**create_location_kwargs)elifschema=="efs":location=self.get_conn().create_location_efs(**create_location_kwargs)else:raiseAirflowException(f"Invalid/Unsupported location type: {schema}")self._refresh_locations()returnlocation["LocationArn"]
[docs]defget_location_arns(self,location_uri:str,case_sensitive:bool=False,ignore_trailing_slash:bool=True)->list[str]:""" Return all LocationArns which match a LocationUri. :param location_uri: Location URI to search for, eg ``s3://mybucket/mypath`` :param case_sensitive: Do a case sensitive search for location URI. :param ignore_trailing_slash: Ignore / at the end of URI when matching. :return: List of LocationArns. :raises AirflowBadRequest: if ``location_uri`` is empty """ifnotlocation_uri:raiseAirflowBadRequest("location_uri not specified")ifnotself.locations:self._refresh_locations()result=[]ifnotcase_sensitive:location_uri=location_uri.lower()ifignore_trailing_slashandlocation_uri.endswith("/"):location_uri=location_uri[:-1]forlocation_from_awsinself.locations:location_uri_from_aws=location_from_aws["LocationUri"]ifnotcase_sensitive:location_uri_from_aws=location_uri_from_aws.lower()ifignore_trailing_slashandlocation_uri_from_aws.endswith("/"):location_uri_from_aws=location_uri_from_aws[:-1]iflocation_uri==location_uri_from_aws:result.append(location_from_aws["LocationArn"])returnresult
def_refresh_locations(self)->None:"""Refresh the local list of Locations."""self.locations=[]next_token=NonewhileTrue:ifnext_token:locations=self.get_conn().list_locations(NextToken=next_token)else:locations=self.get_conn().list_locations()self.locations.extend(locations["Locations"])if"NextToken"notinlocations:breaknext_token=locations["NextToken"]
[docs]defcreate_task(self,source_location_arn:str,destination_location_arn:str,**create_task_kwargs)->str:"""Create a Task between the specified source and destination LocationArns. .. seealso:: - :external+boto3:py:meth:`DataSync.Client.create_task` :param source_location_arn: Source LocationArn. Must exist already. :param destination_location_arn: Destination LocationArn. Must exist already. :param create_task_kwargs: Passed to ``boto.create_task()``. See AWS boto3 datasync documentation. :return: TaskArn of the created Task """task=self.get_conn().create_task(SourceLocationArn=source_location_arn,DestinationLocationArn=destination_location_arn,**create_task_kwargs,)self._refresh_tasks()returntask["TaskArn"]
[docs]defupdate_task(self,task_arn:str,**update_task_kwargs)->None:"""Update a Task. .. seealso:: - :external+boto3:py:meth:`DataSync.Client.update_task` :param task_arn: The TaskArn to update. :param update_task_kwargs: Passed to ``boto.update_task()``, See AWS boto3 datasync documentation. """self.get_conn().update_task(TaskArn=task_arn,**update_task_kwargs)
[docs]defdelete_task(self,task_arn:str)->None:"""Delete a Task. .. seealso:: - :external+boto3:py:meth:`DataSync.Client.delete_task` :param task_arn: The TaskArn to delete. """self.get_conn().delete_task(TaskArn=task_arn)
def_refresh_tasks(self)->None:"""Refreshes the local list of Tasks"""self.tasks=[]next_token=NonewhileTrue:ifnext_token:tasks=self.get_conn().list_tasks(NextToken=next_token)else:tasks=self.get_conn().list_tasks()self.tasks.extend(tasks["Tasks"])if"NextToken"notintasks:breaknext_token=tasks["NextToken"]
[docs]defget_task_arns_for_location_arns(self,source_location_arns:list,destination_location_arns:list,)->list:""" Return list of TaskArns for which use any one of the specified source LocationArns and any one of the specified destination LocationArns. :param source_location_arns: List of source LocationArns. :param destination_location_arns: List of destination LocationArns. :raises AirflowBadRequest: if ``source_location_arns`` or ``destination_location_arns`` are empty. """ifnotsource_location_arns:raiseAirflowBadRequest("source_location_arns not specified")ifnotdestination_location_arns:raiseAirflowBadRequest("destination_location_arns not specified")ifnotself.tasks:self._refresh_tasks()result=[]fortaskinself.tasks:task_arn=task["TaskArn"]task_description=self.get_task_description(task_arn)iftask_description["SourceLocationArn"]insource_location_arns:iftask_description["DestinationLocationArn"]indestination_location_arns:result.append(task_arn)returnresult
[docs]defstart_task_execution(self,task_arn:str,**kwargs)->str:""" Start a TaskExecution for the specified task_arn. Each task can have at most one TaskExecution. Additional keyword arguments send to ``start_task_execution`` boto3 method. .. seealso:: - :external+boto3:py:meth:`DataSync.Client.start_task_execution` :param task_arn: TaskArn :return: TaskExecutionArn :raises ClientError: If a TaskExecution is already busy running for this ``task_arn``. :raises AirflowBadRequest: If ``task_arn`` is empty. """ifnottask_arn:raiseAirflowBadRequest("task_arn not specified")task_execution=self.get_conn().start_task_execution(TaskArn=task_arn,**kwargs)returntask_execution["TaskExecutionArn"]
[docs]defcancel_task_execution(self,task_execution_arn:str)->None:""" Cancel a TaskExecution for the specified ``task_execution_arn``. .. seealso:: - :external+boto3:py:meth:`DataSync.Client.cancel_task_execution` :param task_execution_arn: TaskExecutionArn. :raises AirflowBadRequest: If ``task_execution_arn`` is empty. """ifnottask_execution_arn:raiseAirflowBadRequest("task_execution_arn not specified")self.get_conn().cancel_task_execution(TaskExecutionArn=task_execution_arn)
[docs]defget_task_description(self,task_arn:str)->dict:""" Get description for the specified ``task_arn``. .. seealso:: - :external+boto3:py:meth:`DataSync.Client.describe_task` :param task_arn: TaskArn :return: AWS metadata about a task. :raises AirflowBadRequest: If ``task_arn`` is empty. """ifnottask_arn:raiseAirflowBadRequest("task_arn not specified")returnself.get_conn().describe_task(TaskArn=task_arn)
[docs]defdescribe_task_execution(self,task_execution_arn:str)->dict:""" Get description for the specified ``task_execution_arn``. .. seealso:: - :external+boto3:py:meth:`DataSync.Client.describe_task_execution` :param task_execution_arn: TaskExecutionArn :return: AWS metadata about a task execution. :raises AirflowBadRequest: If ``task_execution_arn`` is empty. """returnself.get_conn().describe_task_execution(TaskExecutionArn=task_execution_arn)
[docs]defget_current_task_execution_arn(self,task_arn:str)->str|None:""" Get current TaskExecutionArn (if one exists) for the specified ``task_arn``. :param task_arn: TaskArn :return: CurrentTaskExecutionArn for this ``task_arn`` or None. :raises AirflowBadRequest: if ``task_arn`` is empty. """ifnottask_arn:raiseAirflowBadRequest("task_arn not specified")task_description=self.get_task_description(task_arn)if"CurrentTaskExecutionArn"intask_description:returntask_description["CurrentTaskExecutionArn"]returnNone
[docs]defwait_for_task_execution(self,task_execution_arn:str,max_iterations:int=60)->bool:""" Wait for Task Execution status to be complete (SUCCESS/ERROR). The ``task_execution_arn`` must exist, or a boto3 ClientError will be raised. :param task_execution_arn: TaskExecutionArn :param max_iterations: Maximum number of iterations before timing out. :return: Result of task execution. :raises AirflowTaskTimeout: If maximum iterations is exceeded. :raises AirflowBadRequest: If ``task_execution_arn`` is empty. """ifnottask_execution_arn:raiseAirflowBadRequest("task_execution_arn not specified")status=Noneiterations=max_iterationswhilestatusisNoneorstatusinself.TASK_EXECUTION_INTERMEDIATE_STATES:task_execution=self.get_conn().describe_task_execution(TaskExecutionArn=task_execution_arn)status=task_execution["Status"]self.log.info("status=%s",status)iterations-=1ifstatusinself.TASK_EXECUTION_FAILURE_STATES:breakifstatusinself.TASK_EXECUTION_SUCCESS_STATES:breakifiterations<=0:breaktime.sleep(self.wait_interval_seconds)ifstatusinself.TASK_EXECUTION_SUCCESS_STATES:returnTrueifstatusinself.TASK_EXECUTION_FAILURE_STATES:returnFalseifiterations<=0:raiseAirflowTaskTimeout("Max iterations exceeded!")raiseAirflowException(f"Unknown status: {status}")# Should never happen