Amazon S3 Operators

Prerequisite Tasks

To use these operators, you must do a few things:

Overview

Airflow to Amazon Simple Storage Service (S3) integration provides several operators to create and interact with S3 buckets.

Two example_dags are provided which showcase these operators in action.

  • example_s3_bucket.py

  • example_s3_bucket_tagging.py

Create and Delete Amazon S3 Buckets

Purpose

This example dag example_s3_bucket.py uses S3CreateBucketOperator and S3DeleteBucketOperator to create a new S3 bucket with a given bucket name then delete it.

Defining tasks

In the following code we create a new bucket, add keys, and then delete the bucket.

airflow/providers/amazon/aws/example_dags/example_s3_bucket.py[source]

@task(task_id="s3_bucket_dag_add_keys_to_bucket")
def upload_keys():
    """This is a python callback to add keys into the s3 bucket"""
    # add keys to bucket
    s3_hook = S3Hook()
    for i in range(0, 3):
        s3_hook.load_string(
            string_data="input",
            key=f"path/data{i}",
            bucket_name=BUCKET_NAME,
        )


with DAG(
    dag_id='s3_bucket_dag',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
    default_args={"bucket_name": BUCKET_NAME},
    max_active_runs=1,
    tags=['example'],
) as dag:

    create_bucket = S3CreateBucketOperator(task_id='s3_bucket_dag_create', region_name='us-east-1')

    # Using a task-decorated function to add keys
    add_keys_to_bucket = upload_keys()

    delete_bucket = S3DeleteBucketOperator(task_id='s3_bucket_dag_delete', force_delete=True)

    create_bucket >> add_keys_to_bucket >> delete_bucket

Using Amazon S3 Bucket Tagging

Purpose

This example dag example_s3_bucket_tagging.py uses S3DeleteBucketTaggingOperator, S3GetBucketTaggingOperator, and S3PutBucketTaggingOperator to create a new S3 bucket, apply tagging, get tagging, delete tagging, then delete the bucket.

Defining tasks

In the following code we create a new S3 bucket, apply tagging, get tagging, delete tagging, then delete the bucket.

airflow/providers/amazon/aws/example_dags/example_s3_bucket_tagging.py[source]

with DAG(
    dag_id='s3_bucket_tagging_dag',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
    default_args={"bucket_name": BUCKET_NAME},
    max_active_runs=1,
    tags=['example'],
) as dag:

    create_bucket = S3CreateBucketOperator(task_id='s3_bucket_tagging_dag_create', region_name='us-east-1')

    delete_bucket = S3DeleteBucketOperator(task_id='s3_bucket_tagging_dag_delete', force_delete=True)

    get_tagging = S3GetBucketTaggingOperator(task_id='s3_bucket_tagging_dag_get_tagging')

    put_tagging = S3PutBucketTaggingOperator(
        task_id='s3_bucket_tagging_dag_put_tagging', key=TAG_KEY, value=TAG_VALUE
    )

    delete_tagging = S3DeleteBucketTaggingOperator(task_id='s3_bucket_tagging_dag_delete_tagging')

    create_bucket >> put_tagging >> get_tagging >> delete_tagging >> delete_bucket

Reference

For further information, look at:

Was this entry helpful?