Source code for tests.system.providers.amazon.aws.example_datasync
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.fromdatetimeimportdatetimeimportboto3fromairflowimportmodelsfromairflow.decoratorsimporttaskfromairflow.models.baseoperatorimportchainfromairflow.providers.amazon.aws.operators.datasyncimportDataSyncOperatorfromairflow.providers.amazon.aws.operators.s3importS3CreateBucketOperator,S3DeleteBucketOperatorfromairflow.utils.trigger_ruleimportTriggerRulefromtests.system.providers.amazon.aws.utilsimportENV_ID_KEY,SystemTestContextBuilder
s3_bucket_source:str=f'{test_context[ENV_ID_KEY]}-datasync-bucket-source's3_bucket_destination:str=f'{test_context[ENV_ID_KEY]}-datasync-bucket-destination'create_s3_bucket_source=S3CreateBucketOperator(task_id='create_s3_bucket_source',bucket_name=s3_bucket_source)create_s3_bucket_destination=S3CreateBucketOperator(task_id='create_s3_bucket_destination',bucket_name=s3_bucket_destination)source_location=create_source_location(s3_bucket_source,test_context[ROLE_ARN_KEY])destination_location=create_destination_location(s3_bucket_destination,test_context[ROLE_ARN_KEY])created_task_arn=create_task()# [START howto_operator_datasync_specific_task]# Execute a specific taskexecute_task_by_arn=DataSyncOperator(task_id='execute_task_by_arn',task_arn=created_task_arn,wait_for_completion=False,)# [END howto_operator_datasync_specific_task]# [START howto_operator_datasync_search_task]# Search and execute a taskexecute_task_by_locations=DataSyncOperator(task_id='execute_task_by_locations',source_location_uri=f's3://{s3_bucket_source}/test',destination_location_uri=f's3://{s3_bucket_destination}/test',# Only transfer files from /test/subdir foldertask_execution_kwargs={'Includes':[{'FilterType':'SIMPLE_PATTERN','Value':'/test/subdir'}],},wait_for_completion=False,)# [END howto_operator_datasync_search_task]# [START howto_operator_datasync_create_task]# Create a task (the task does not exist)create_and_execute_task=DataSyncOperator(task_id='create_and_execute_task',source_location_uri=f's3://{s3_bucket_source}/test_create',destination_location_uri=f's3://{s3_bucket_destination}/test_create',create_task_kwargs={"Name":"Created by Airflow"},create_source_location_kwargs={'Subdirectory':'test_create','S3BucketArn':get_s3_bucket_arn(s3_bucket_source),'S3Config':{'BucketAccessRoleArn':test_context[ROLE_ARN_KEY],},},create_destination_location_kwargs={'Subdirectory':'test_create','S3BucketArn':get_s3_bucket_arn(s3_bucket_destination),'S3Config':{'BucketAccessRoleArn':test_context[ROLE_ARN_KEY],},},delete_task_after_execution=False,wait_for_completion=False,)# [END howto_operator_datasync_create_task]locations_task=list_locations(s3_bucket_source,s3_bucket_destination)delete_locations_task=delete_locations(locations_task)delete_s3_bucket_source=S3DeleteBucketOperator(task_id='delete_s3_bucket_source',bucket_name=s3_bucket_source,force_delete=True,trigger_rule=TriggerRule.ALL_DONE,)delete_s3_bucket_destination=S3DeleteBucketOperator(task_id='delete_s3_bucket_destination',bucket_name=s3_bucket_destination,force_delete=True,trigger_rule=TriggerRule.ALL_DONE,)chain(# TEST SETUPtest_context,create_s3_bucket_source,create_s3_bucket_destination,source_location,destination_location,created_task_arn,# TEST BODYexecute_task_by_arn,execute_task_by_locations,create_and_execute_task,# TEST TEARDOWNdelete_task(created_task_arn),delete_task_created_by_operator(),locations_task,delete_locations_task,delete_s3_bucket_source,delete_s3_bucket_destination,)fromtests.system.utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "tearDown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests.system.utilsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)