Source code for airflow.providers.amazon.aws.example_dags.example_s3
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importosfromdatetimeimportdatetimefromtypingimportListfromairflow.models.baseoperatorimportchainfromairflow.models.dagimportDAGfromairflow.providers.amazon.aws.operators.s3import(S3CopyObjectOperator,S3CreateBucketOperator,S3CreateObjectOperator,S3DeleteBucketOperator,S3DeleteBucketTaggingOperator,S3DeleteObjectsOperator,S3FileTransformOperator,S3GetBucketTaggingOperator,S3ListOperator,S3ListPrefixesOperator,S3PutBucketTaggingOperator,)fromairflow.providers.amazon.aws.sensors.s3importS3KeySensor,S3KeysUnchangedSensor
[docs]defcheck_fn(files:List)->bool:""" Example of custom check: check if all files are bigger than ``1kB`` :param files: List of S3 object attributes. :return: true if the criteria is met :rtype: bool """returnall(f.get('Size',0)>1024forfinfiles)
# [END howto_sensor_s3_key_function_definition]# [START howto_operator_s3_create_bucket]create_bucket=S3CreateBucketOperator(task_id='s3_create_bucket',bucket_name=BUCKET_NAME,)# [END howto_operator_s3_create_bucket]# [START howto_operator_s3_put_bucket_tagging]put_tagging=S3PutBucketTaggingOperator(task_id='s3_put_bucket_tagging',bucket_name=BUCKET_NAME,key=TAG_KEY,value=TAG_VALUE,)# [END howto_operator_s3_put_bucket_tagging]# [START howto_operator_s3_get_bucket_tagging]get_tagging=S3GetBucketTaggingOperator(task_id='s3_get_bucket_tagging',bucket_name=BUCKET_NAME,)# [END howto_operator_s3_get_bucket_tagging]# [START howto_operator_s3_delete_bucket_tagging]delete_tagging=S3DeleteBucketTaggingOperator(task_id='s3_delete_bucket_tagging',bucket_name=BUCKET_NAME,)# [END howto_operator_s3_delete_bucket_tagging]# [START howto_operator_s3_create_object]create_object=S3CreateObjectOperator(task_id="s3_create_object",s3_bucket=BUCKET_NAME,s3_key=KEY,data=DATA,replace=True,)# [END howto_operator_s3_create_object]# [START howto_operator_s3_list_prefixes]list_prefixes=S3ListPrefixesOperator(task_id="s3_list_prefix_operator",bucket=BUCKET_NAME,prefix=PREFIX,delimiter=DELIMITER,)# [END howto_operator_s3_list_prefixes]# [START howto_operator_s3_list]list_keys=S3ListOperator(task_id="s3_list_operator",bucket=BUCKET_NAME,prefix=PREFIX,)# [END howto_operator_s3_list]# [START howto_sensor_s3_key_single_key]# Check if a file existssensor_one_key=S3KeySensor(task_id="s3_sensor_one_key",bucket_name=BUCKET_NAME,bucket_key=KEY,)# [END howto_sensor_s3_key_single_key]# [START howto_sensor_s3_key_multiple_keys]# Check if both files existsensor_two_keys=S3KeySensor(task_id="s3_sensor_two_keys",bucket_name=BUCKET_NAME,bucket_key=[KEY,KEY_2],)# [END howto_sensor_s3_key_multiple_keys]# [START howto_sensor_s3_key_function]# Check if a file exists and match a certain pattern defined in check_fnsensor_key_with_function=S3KeySensor(task_id="s3_sensor_key_function",bucket_name=BUCKET_NAME,bucket_key=KEY,check_fn=check_fn,)# [END howto_sensor_s3_key_function]# [START howto_sensor_s3_keys_unchanged]sensor_keys_unchanged=S3KeysUnchangedSensor(task_id="s3_sensor_one_key_size",bucket_name=BUCKET_NAME_2,prefix=PREFIX,inactivity_period=10,)# [END howto_sensor_s3_keys_unchanged]# [START howto_operator_s3_copy_object]copy_object=S3CopyObjectOperator(task_id="s3_copy_object",source_bucket_name=BUCKET_NAME,dest_bucket_name=BUCKET_NAME_2,source_bucket_key=KEY,dest_bucket_key=KEY_2,)# [END howto_operator_s3_copy_object]# [START howto_operator_s3_file_transform]transforms_file=S3FileTransformOperator(task_id="s3_file_transform",source_s3_key=f's3://{BUCKET_NAME}/{KEY}',dest_s3_key=f's3://{BUCKET_NAME_2}/{KEY_2}',# Use `cp` command as transform script as an exampletransform_script='cp',replace=True,)# [END howto_operator_s3_file_transform]# [START howto_operator_s3_delete_objects]delete_objects=S3DeleteObjectsOperator(task_id="s3_delete_objects",bucket=BUCKET_NAME_2,keys=KEY_2,)# [END howto_operator_s3_delete_objects]# [START howto_operator_s3_delete_bucket]delete_bucket=S3DeleteBucketOperator(task_id='s3_delete_bucket',bucket_name=BUCKET_NAME,force_delete=True)# [END howto_operator_s3_delete_bucket]chain(create_bucket,put_tagging,get_tagging,delete_tagging,create_object,list_prefixes,list_keys,[sensor_one_key,sensor_two_keys,sensor_key_with_function],copy_object,transforms_file,sensor_keys_unchanged,delete_objects,delete_bucket,)