Source code for airflow.providers.amazon.aws.example_dags.example_s3

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import os
from datetime import datetime
from typing import List

from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
from airflow.providers.amazon.aws.operators.s3 import (
    S3CopyObjectOperator,
    S3CreateBucketOperator,
    S3CreateObjectOperator,
    S3DeleteBucketOperator,
    S3DeleteBucketTaggingOperator,
    S3DeleteObjectsOperator,
    S3FileTransformOperator,
    S3GetBucketTaggingOperator,
    S3ListOperator,
    S3ListPrefixesOperator,
    S3PutBucketTaggingOperator,
)
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor, S3KeysUnchangedSensor

[docs]BUCKET_NAME = os.environ.get('BUCKET_NAME', 'test-airflow-12345')
[docs]BUCKET_NAME_2 = os.environ.get('BUCKET_NAME_2', 'test-airflow-123456')
[docs]KEY = os.environ.get('KEY', 'key')
[docs]KEY_2 = os.environ.get('KEY_2', 'key2')
# Empty string prefix refers to the bucket root # See what prefix is here https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html
[docs]PREFIX = os.environ.get('PREFIX', '')
[docs]DELIMITER = os.environ.get('DELIMITER', '/')
[docs]TAG_KEY = os.environ.get('TAG_KEY', 'test-s3-bucket-tagging-key')
[docs]TAG_VALUE = os.environ.get('TAG_VALUE', 'test-s3-bucket-tagging-value')
[docs]DATA = os.environ.get( 'DATA', ''' apple,0.5 milk,2.5 bread,4.0 ''',
) with DAG( dag_id='example_s3', schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example'], ) as dag: # [START howto_sensor_s3_key_function_definition]
[docs] def check_fn(files: List) -> bool: """ Example of custom check: check if all files are bigger than ``1kB`` :param files: List of S3 object attributes. :return: true if the criteria is met :rtype: bool """ return all(f.get('Size', 0) > 1024 for f in files)
# [END howto_sensor_s3_key_function_definition] # [START howto_operator_s3_create_bucket] create_bucket = S3CreateBucketOperator( task_id='s3_create_bucket', bucket_name=BUCKET_NAME, ) # [END howto_operator_s3_create_bucket] # [START howto_operator_s3_put_bucket_tagging] put_tagging = S3PutBucketTaggingOperator( task_id='s3_put_bucket_tagging', bucket_name=BUCKET_NAME, key=TAG_KEY, value=TAG_VALUE, ) # [END howto_operator_s3_put_bucket_tagging] # [START howto_operator_s3_get_bucket_tagging] get_tagging = S3GetBucketTaggingOperator( task_id='s3_get_bucket_tagging', bucket_name=BUCKET_NAME, ) # [END howto_operator_s3_get_bucket_tagging] # [START howto_operator_s3_delete_bucket_tagging] delete_tagging = S3DeleteBucketTaggingOperator( task_id='s3_delete_bucket_tagging', bucket_name=BUCKET_NAME, ) # [END howto_operator_s3_delete_bucket_tagging] # [START howto_operator_s3_create_object] create_object = S3CreateObjectOperator( task_id="s3_create_object", s3_bucket=BUCKET_NAME, s3_key=KEY, data=DATA, replace=True, ) # [END howto_operator_s3_create_object] # [START howto_operator_s3_list_prefixes] list_prefixes = S3ListPrefixesOperator( task_id="s3_list_prefix_operator", bucket=BUCKET_NAME, prefix=PREFIX, delimiter=DELIMITER, ) # [END howto_operator_s3_list_prefixes] # [START howto_operator_s3_list] list_keys = S3ListOperator( task_id="s3_list_operator", bucket=BUCKET_NAME, prefix=PREFIX, ) # [END howto_operator_s3_list] # [START howto_sensor_s3_key_single_key] # Check if a file exists sensor_one_key = S3KeySensor( task_id="s3_sensor_one_key", bucket_name=BUCKET_NAME, bucket_key=KEY, ) # [END howto_sensor_s3_key_single_key] # [START howto_sensor_s3_key_multiple_keys] # Check if both files exist sensor_two_keys = S3KeySensor( task_id="s3_sensor_two_keys", bucket_name=BUCKET_NAME, bucket_key=[KEY, KEY_2], ) # [END howto_sensor_s3_key_multiple_keys] # [START howto_sensor_s3_key_function] # Check if a file exists and match a certain pattern defined in check_fn sensor_key_with_function = S3KeySensor( task_id="s3_sensor_key_function", bucket_name=BUCKET_NAME, bucket_key=KEY, check_fn=check_fn, ) # [END howto_sensor_s3_key_function] # [START howto_sensor_s3_keys_unchanged] sensor_keys_unchanged = S3KeysUnchangedSensor( task_id="s3_sensor_one_key_size", bucket_name=BUCKET_NAME_2, prefix=PREFIX, inactivity_period=10, ) # [END howto_sensor_s3_keys_unchanged] # [START howto_operator_s3_copy_object] copy_object = S3CopyObjectOperator( task_id="s3_copy_object", source_bucket_name=BUCKET_NAME, dest_bucket_name=BUCKET_NAME_2, source_bucket_key=KEY, dest_bucket_key=KEY_2, ) # [END howto_operator_s3_copy_object] # [START howto_operator_s3_file_transform] transforms_file = S3FileTransformOperator( task_id="s3_file_transform", source_s3_key=f's3://{BUCKET_NAME}/{KEY}', dest_s3_key=f's3://{BUCKET_NAME_2}/{KEY_2}', # Use `cp` command as transform script as an example transform_script='cp', replace=True, ) # [END howto_operator_s3_file_transform] # [START howto_operator_s3_delete_objects] delete_objects = S3DeleteObjectsOperator( task_id="s3_delete_objects", bucket=BUCKET_NAME_2, keys=KEY_2, ) # [END howto_operator_s3_delete_objects] # [START howto_operator_s3_delete_bucket] delete_bucket = S3DeleteBucketOperator( task_id='s3_delete_bucket', bucket_name=BUCKET_NAME, force_delete=True ) # [END howto_operator_s3_delete_bucket] chain( create_bucket, put_tagging, get_tagging, delete_tagging, create_object, list_prefixes, list_keys, [sensor_one_key, sensor_two_keys, sensor_key_with_function], copy_object, transforms_file, sensor_keys_unchanged, delete_objects, delete_bucket, )

Was this entry helpful?