Amazon S3 Operators¶
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'
Detailed information is available Installation
Overview¶
Airflow to Amazon Simple Storage Service (S3) integration provides several operators to create and interact with S3 buckets.
Two example_dags are provided which showcase these operators in action.
example_s3_bucket.py
example_s3_bucket_tagging.py
Create and Delete Amazon S3 Buckets¶
Purpose¶
This example dag example_s3_bucket.py
uses S3CreateBucketOperator
and S3DeleteBucketOperator
to create a
new S3 bucket with a given bucket name then delete it.
Defining tasks¶
In the following code we create a new bucket, add keys, and then delete the bucket.
with DAG(
dag_id='s3_bucket_dag',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
default_args={"bucket_name": BUCKET_NAME},
max_active_runs=1,
tags=['example'],
) as dag:
create_bucket = S3CreateBucketOperator(task_id='s3_bucket_dag_create', region_name='us-east-1')
# Using a task-decorated function to add keys
add_keys_to_bucket = upload_keys()
delete_bucket = S3DeleteBucketOperator(task_id='s3_bucket_dag_delete', force_delete=True)
create_bucket >> add_keys_to_bucket >> delete_bucket
Using Amazon S3 Bucket Tagging¶
Purpose¶
This example dag example_s3_bucket_tagging.py
uses S3DeleteBucketTaggingOperator
, S3GetBucketTaggingOperator
,
and S3PutBucketTaggingOperator
to create a new S3 bucket, apply tagging, get tagging, delete tagging, then delete the bucket.
Defining tasks¶
In the following code we create a new S3 bucket, apply tagging, get tagging, delete tagging, then delete the bucket.
with DAG(
dag_id='s3_bucket_tagging_dag',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
default_args={"bucket_name": BUCKET_NAME},
max_active_runs=1,
tags=['example'],
) as dag:
create_bucket = S3CreateBucketOperator(task_id='s3_bucket_tagging_dag_create', region_name='us-east-1')
delete_bucket = S3DeleteBucketOperator(task_id='s3_bucket_tagging_dag_delete', force_delete=True)
get_tagging = S3GetBucketTaggingOperator(task_id='s3_bucket_tagging_dag_get_tagging')
put_tagging = S3PutBucketTaggingOperator(
task_id='s3_bucket_tagging_dag_put_tagging', key=TAG_KEY, value=TAG_VALUE
)
delete_tagging = S3DeleteBucketTaggingOperator(task_id='s3_bucket_tagging_dag_delete_tagging')
create_bucket >> put_tagging >> get_tagging >> delete_tagging >> delete_bucket