Airflow Summit 2021 is coming July 8-16. Register now!

Amazon S3 Operators

Prerequisite Tasks

To use these operators, you must do a few things:

Overview

Airflow to Amazon Simple Storage Service (S3) integration provides several operators to create and interact with S3 buckets.

Two example_dags are provided which showcase these operators in action.

  • example_s3_bucket.py

  • example_s3_bucket_tagging.py

Create and Delete Amazon S3 Buckets

Purpose

This example dag example_s3_bucket.py uses S3CreateBucketOperator and S3DeleteBucketOperator to create a new S3 bucket with a given bucket name then delete it.

Defining tasks

In the following code we create a new bucket, add keys, and then delete the bucket.

airflow/providers/amazon/aws/example_dags/example_s3_bucket.pyView Source

    create_bucket = S3CreateBucketOperator(
        task_id='s3_bucket_dag_create',
        bucket_name=BUCKET_NAME,
        region_name='us-east-1',
    )

    add_keys_to_bucket = PythonOperator(
        task_id="s3_bucket_dag_add_keys_to_bucket", python_callable=upload_keys
    )

    delete_bucket = S3DeleteBucketOperator(
        task_id='s3_bucket_dag_delete',
        bucket_name=BUCKET_NAME,
        force_delete=True,
    )

Using Amazon S3 Bucket Tagging

Purpose

This example dag example_s3_bucket_tagging.py uses S3DeleteBucketTaggingOperator, S3GetBucketTaggingOperator, and S3PutBucketTaggingOperator to create a new S3 bucket, apply tagging, get tagging, delete tagging, then delete the bucket.

Defining tasks

In the following code we create a new S3 bucket, apply tagging, get tagging, delete tagging, then delete the bucket.

airflow/providers/amazon/aws/example_dags/example_s3_bucket_tagging.pyView Source

    get_tagging = S3GetBucketTaggingOperator(
        task_id='s3_bucket_tagging_dag_get_tagging', bucket_name=BUCKET_NAME
    )

    put_tagging = S3PutBucketTaggingOperator(
        task_id='s3_bucket_tagging_dag_put_tagging', bucket_name=BUCKET_NAME, key=TAG_KEY, value=TAG_VALUE
    )

    delete_tagging = S3DeleteBucketTaggingOperator(
        task_id='s3_bucket_tagging_dag_delete_tagging', bucket_name=BUCKET_NAME
    )

Reference

For further information, look at:

Was this entry helpful?