Source code for tests.system.amazon.aws.example_datasync
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromdatetimeimportdatetimeimportboto3fromairflow.decoratorsimporttaskfromairflow.models.baseoperatorimportchainfromairflow.models.dagimportDAGfromairflow.providers.amazon.aws.operators.datasyncimportDataSyncOperatorfromairflow.providers.amazon.aws.operators.s3importS3CreateBucketOperator,S3DeleteBucketOperatorfromairflow.utils.trigger_ruleimportTriggerRulefromsystem.amazon.aws.utilsimportENV_ID_KEY,SystemTestContextBuilder
s3_bucket_source:str=f"{test_context[ENV_ID_KEY]}-datasync-bucket-source"s3_bucket_destination:str=f"{test_context[ENV_ID_KEY]}-datasync-bucket-destination"create_s3_bucket_source=S3CreateBucketOperator(task_id="create_s3_bucket_source",bucket_name=s3_bucket_source)create_s3_bucket_destination=S3CreateBucketOperator(task_id="create_s3_bucket_destination",bucket_name=s3_bucket_destination)source_location=create_source_location(s3_bucket_source,test_context[ROLE_ARN_KEY])destination_location=create_destination_location(s3_bucket_destination,test_context[ROLE_ARN_KEY])created_task_arn=create_task()# [START howto_operator_datasync_specific_task]# Execute a specific taskexecute_task_by_arn=DataSyncOperator(task_id="execute_task_by_arn",task_arn=created_task_arn,)# [END howto_operator_datasync_specific_task]# DataSyncOperator waits by default, setting as False to test the Sensor below.execute_task_by_arn.wait_for_completion=False# [START howto_operator_datasync_search_task]# Search and execute a taskexecute_task_by_locations=DataSyncOperator(task_id="execute_task_by_locations",source_location_uri=f"s3://{s3_bucket_source}/test",destination_location_uri=f"s3://{s3_bucket_destination}/test",# Only transfer files from /test/subdir foldertask_execution_kwargs={"Includes":[{"FilterType":"SIMPLE_PATTERN","Value":"/test/subdir"}],},)# [END howto_operator_datasync_search_task]# DataSyncOperator waits by default, setting as False to test the Sensor below.execute_task_by_locations.wait_for_completion=False# [START howto_operator_datasync_create_task]# Create a task (the task does not exist)create_and_execute_task=DataSyncOperator(task_id="create_and_execute_task",source_location_uri=f"s3://{s3_bucket_source}/test_create",destination_location_uri=f"s3://{s3_bucket_destination}/test_create",create_task_kwargs={"Name":"Created by Airflow"},create_source_location_kwargs={"Subdirectory":"test_create","S3BucketArn":get_s3_bucket_arn(s3_bucket_source),"S3Config":{"BucketAccessRoleArn":test_context[ROLE_ARN_KEY],},},create_destination_location_kwargs={"Subdirectory":"test_create","S3BucketArn":get_s3_bucket_arn(s3_bucket_destination),"S3Config":{"BucketAccessRoleArn":test_context[ROLE_ARN_KEY],},},delete_task_after_execution=False,)# [END howto_operator_datasync_create_task]# DataSyncOperator waits by default, setting as False to test the Sensor below.create_and_execute_task.wait_for_completion=Falselocations_task=list_locations(s3_bucket_source,s3_bucket_destination)delete_locations_task=delete_locations(locations_task)delete_s3_bucket_source=S3DeleteBucketOperator(task_id="delete_s3_bucket_source",bucket_name=s3_bucket_source,force_delete=True,trigger_rule=TriggerRule.ALL_DONE,)delete_s3_bucket_destination=S3DeleteBucketOperator(task_id="delete_s3_bucket_destination",bucket_name=s3_bucket_destination,force_delete=True,trigger_rule=TriggerRule.ALL_DONE,)chain(# TEST SETUPtest_context,create_s3_bucket_source,create_s3_bucket_destination,source_location,destination_location,created_task_arn,# TEST BODYexecute_task_by_arn,execute_task_by_locations,create_and_execute_task,# TEST TEARDOWNdelete_task(created_task_arn),delete_task_created_by_operator(),locations_task,delete_locations_task,delete_s3_bucket_source,delete_s3_bucket_destination,)fromtests_common.test_utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "tearDown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests_common.test_utils.system_testsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)