Source code for tests.system.providers.amazon.aws.example_s3
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromdatetimeimportdatetimefromairflow.models.baseoperatorimportchainfromairflow.models.dagimportDAGfromairflow.operators.pythonimportBranchPythonOperatorfromairflow.providers.amazon.aws.operators.s3import(S3CopyObjectOperator,S3CreateBucketOperator,S3CreateObjectOperator,S3DeleteBucketOperator,S3DeleteBucketTaggingOperator,S3DeleteObjectsOperator,S3FileTransformOperator,S3GetBucketTaggingOperator,S3ListOperator,S3ListPrefixesOperator,S3PutBucketTaggingOperator,)fromairflow.providers.amazon.aws.sensors.s3importS3KeySensor,S3KeysUnchangedSensorfromairflow.utils.trigger_ruleimportTriggerRulefromtests.system.providers.amazon.aws.utilsimportENV_ID_KEY,SystemTestContextBuilder
env_id=test_context[ENV_ID_KEY]bucket_name=f"{env_id}-s3-bucket"bucket_name_2=f"{env_id}-s3-bucket-2"key=f"{env_id}-key"key_2=f"{env_id}-key2"# [START howto_sensor_s3_key_function_definition]defcheck_fn(files:list)->bool:""" Example of custom check: check if all files are bigger than ``20 bytes`` :param files: List of S3 object attributes. :return: true if the criteria is met """returnall(f.get("Size",0)>20forfinfiles)# [END howto_sensor_s3_key_function_definition]# [START howto_operator_s3_create_bucket]create_bucket=S3CreateBucketOperator(task_id="create_bucket",bucket_name=bucket_name,)# [END howto_operator_s3_create_bucket]create_bucket_2=S3CreateBucketOperator(task_id="create_bucket_2",bucket_name=bucket_name_2,)# [START howto_operator_s3_put_bucket_tagging]put_tagging=S3PutBucketTaggingOperator(task_id="put_tagging",bucket_name=bucket_name,key=TAG_KEY,value=TAG_VALUE,)# [END howto_operator_s3_put_bucket_tagging]# [START howto_operator_s3_get_bucket_tagging]get_tagging=S3GetBucketTaggingOperator(task_id="get_tagging",bucket_name=bucket_name,)# [END howto_operator_s3_get_bucket_tagging]# [START howto_operator_s3_delete_bucket_tagging]delete_tagging=S3DeleteBucketTaggingOperator(task_id="delete_tagging",bucket_name=bucket_name,)# [END howto_operator_s3_delete_bucket_tagging]# [START howto_operator_s3_create_object]create_object=S3CreateObjectOperator(task_id="create_object",s3_bucket=bucket_name,s3_key=key,data=DATA,replace=True,)# [END howto_operator_s3_create_object]create_object_2=S3CreateObjectOperator(task_id="create_object_2",s3_bucket=bucket_name,s3_key=key_2,data=DATA,replace=True,)# [START howto_operator_s3_list_prefixes]list_prefixes=S3ListPrefixesOperator(task_id="list_prefixes",bucket=bucket_name,prefix=PREFIX,delimiter=DELIMITER,)# [END howto_operator_s3_list_prefixes]# [START howto_operator_s3_list]list_keys=S3ListOperator(task_id="list_keys",bucket=bucket_name,prefix=PREFIX,)# [END howto_operator_s3_list]# [START howto_sensor_s3_key_single_key]# Check if a file existssensor_one_key=S3KeySensor(task_id="sensor_one_key",bucket_name=bucket_name,bucket_key=key,)# [END howto_sensor_s3_key_single_key]# [START howto_sensor_s3_key_multiple_keys]# Check if both files existsensor_two_keys=S3KeySensor(task_id="sensor_two_keys",bucket_name=bucket_name,bucket_key=[key,key_2],)# [END howto_sensor_s3_key_multiple_keys]# [START howto_sensor_s3_key_single_key_deferrable]# Check if a file existssensor_one_key_deferrable=S3KeySensor(task_id="sensor_one_key_deferrable",bucket_name=bucket_name,bucket_key=key,deferrable=True,)# [END howto_sensor_s3_key_single_key_deferrable]# [START howto_sensor_s3_key_multiple_keys_deferrable]# Check if both files existsensor_two_keys_deferrable=S3KeySensor(task_id="sensor_two_keys_deferrable",bucket_name=bucket_name,bucket_key=[key,key_2],deferrable=True,)# [END howto_sensor_s3_key_multiple_keys_deferrable]# [START howto_sensor_s3_key_function]# Check if a file exists and match a certain pattern defined in check_fnsensor_key_with_function_deferrable=S3KeySensor(task_id="sensor_key_with_function_deferrable",bucket_name=bucket_name,bucket_key=key,check_fn=check_fn,deferrable=True,)# [END howto_sensor_s3_key_function]# [START howto_sensor_s3_key_function]# Check if a file exists and match a certain pattern defined in check_fnsensor_key_with_function=S3KeySensor(task_id="sensor_key_with_function",bucket_name=bucket_name,bucket_key=key,check_fn=check_fn,)# [END howto_sensor_s3_key_function]# [START howto_operator_s3_copy_object]copy_object=S3CopyObjectOperator(task_id="copy_object",source_bucket_name=bucket_name,dest_bucket_name=bucket_name_2,source_bucket_key=key,dest_bucket_key=key_2,)# [END howto_operator_s3_copy_object]# [START howto_operator_s3_file_transform]file_transform=S3FileTransformOperator(task_id="file_transform",source_s3_key=f"s3://{bucket_name}/{key}",dest_s3_key=f"s3://{bucket_name_2}/{key_2}",# Use `cp` command as transform script as an exampletransform_script="cp",replace=True,)# [END howto_operator_s3_file_transform]# This task skips the `sensor_keys_unchanged` task because the S3KeysUnchangedSensor# runs in poke mode only, which is not supported by the DebugExecutor, causing system tests to fail.branching=BranchPythonOperator(task_id="branch_to_delete_objects",python_callable=lambda:"delete_objects")# [START howto_sensor_s3_keys_unchanged]sensor_keys_unchanged=S3KeysUnchangedSensor(task_id="sensor_keys_unchanged",bucket_name=bucket_name_2,prefix=PREFIX,inactivity_period=10,# inactivity_period in seconds)# [END howto_sensor_s3_keys_unchanged]# [START howto_operator_s3_delete_objects]delete_objects=S3DeleteObjectsOperator(task_id="delete_objects",bucket=bucket_name_2,keys=key_2,)# [END howto_operator_s3_delete_objects]delete_objects.trigger_rule=TriggerRule.ALL_DONE# [START howto_operator_s3_delete_bucket]delete_bucket=S3DeleteBucketOperator(task_id="delete_bucket",bucket_name=bucket_name,force_delete=True,)# [END howto_operator_s3_delete_bucket]delete_bucket.trigger_rule=TriggerRule.ALL_DONEdelete_bucket_2=S3DeleteBucketOperator(task_id="delete_bucket_2",bucket_name=bucket_name_2,force_delete=True,)delete_bucket_2.trigger_rule=TriggerRule.ALL_DONEchain(# TEST SETUPtest_context,# TEST BODYcreate_bucket,create_bucket_2,put_tagging,get_tagging,delete_tagging,create_object,create_object_2,list_prefixes,list_keys,[sensor_one_key,sensor_two_keys,sensor_key_with_function],[sensor_one_key_deferrable,sensor_two_keys_deferrable,sensor_key_with_function_deferrable],copy_object,file_transform,branching,sensor_keys_unchanged,# TEST TEARDOWNdelete_objects,delete_bucket,delete_bucket_2,)fromtests.system.utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "tearDown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests.system.utilsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)