Amazon S3 Operators

Airflow to Amazon Simple Storage Service (S3) integration provides several operators to create and interact with S3 buckets.

Prerequisite Tasks

To use these operators, you must do a few things:

Create an Amazon S3 Bucket

To create an Amazon S3 bucket you can use S3CreateBucketOperator.

airflow/providers/amazon/aws/example_dags/example_s3.py[source]

create_bucket = S3CreateBucketOperator(
    task_id='s3_create_bucket',
    bucket_name=BUCKET_NAME,
)

Delete an Amazon S3 Bucket

To delete an Amazon S3 bucket you can use S3DeleteBucketOperator.

airflow/providers/amazon/aws/example_dags/example_s3.py[source]

delete_bucket = S3DeleteBucketOperator(
    task_id='s3_delete_bucket', bucket_name=BUCKET_NAME, force_delete=True
)

Set the tags for an Amazon S3 Bucket

To set the tags for an Amazon S3 bucket you can use S3PutBucketTaggingOperator.

airflow/providers/amazon/aws/example_dags/example_s3.py[source]

put_tagging = S3PutBucketTaggingOperator(
    task_id='s3_put_bucket_tagging',
    bucket_name=BUCKET_NAME,
    key=TAG_KEY,
    value=TAG_VALUE,
)

Get the tag of an Amazon S3 Bucket

To get the tag set associated with an Amazon S3 bucket you can use S3GetBucketTaggingOperator.

airflow/providers/amazon/aws/example_dags/example_s3.py[source]

get_tagging = S3GetBucketTaggingOperator(
    task_id='s3_get_bucket_tagging',
    bucket_name=BUCKET_NAME,
)

Delete the tags of an Amazon S3 Bucket

To delete the tags of an Amazon S3 bucket you can use S3DeleteBucketTaggingOperator.

airflow/providers/amazon/aws/example_dags/example_s3.py[source]

delete_tagging = S3DeleteBucketTaggingOperator(
    task_id='s3_delete_bucket_tagging',
    bucket_name=BUCKET_NAME,
)

Amazon S3 Key Sensor

To wait for one or multiple keys to be present in an Amazon S3 bucket you can use S3KeySensor. For each key, it calls head_object API (or list_objects_v2 API if wildcard_match is True) to check whether it is present or not. Please keep in mind, especially when used to check a large volume of keys, that it makes one API call per key.

To check one file:

airflow/providers/amazon/aws/example_dags/example_s3.py[source]

# Check if a file exists
sensor_one_key = S3KeySensor(
    task_id="s3_sensor_one_key",
    bucket_name=BUCKET_NAME,
    bucket_key=KEY,
)

To check multiple files:

airflow/providers/amazon/aws/example_dags/example_s3.py[source]

# Check if both files exist
sensor_two_keys = S3KeySensor(
    task_id="s3_sensor_two_keys",
    bucket_name=BUCKET_NAME,
    bucket_key=[KEY, KEY_2],
)

To check with an additional custom check you can define a function which receives a list of matched S3 object attributes and returns a boolean:

  • True: a certain criteria is met

  • False: the criteria isn't met

This function is called for each key passed as parameter in bucket_key. The reason why the parameter of this function is a list of objects is when wildcard_match is True, multiple files can match one key. The list of matched S3 object attributes contain only the size and is this format:

[{"Size": int}]

airflow/providers/amazon/aws/example_dags/example_s3.py[source]

def check_fn(files: List) -> bool:
    """
    Example of custom check: check if all files are bigger than 1kB

    :param files: List of S3 object attributes.
    Format: [{
        'Size': int
    }]
    :return: true if the criteria is met
    :rtype: bool
    """
    return all(f.get('Size', 0) > 1024 for f in files)

airflow/providers/amazon/aws/example_dags/example_s3.py[source]

# Check if a file exists and match a certain pattern defined in check_fn
sensor_key_with_function = S3KeySensor(
    task_id="s3_sensor_key_function",
    bucket_name=BUCKET_NAME,
    bucket_key=KEY,
    check_fn=check_fn,
)

Amazon S3 Key Unchanged Sensor

To check for changes in the number of objects at a specific prefix in an Amazon S3 bucket and waits until the inactivity period has passed with no increase in the number of objects you can use S3KeysUnchangedSensor. Note, this sensor will not behave correctly in reschedule mode, as the state of the listed objects in the Amazon S3 bucket will be lost between rescheduled invocations.

airflow/providers/amazon/aws/example_dags/example_s3.py[source]

sensor_keys_unchanged = S3KeysUnchangedSensor(
    task_id="s3_sensor_one_key_size",
    bucket_name=BUCKET_NAME_2,
    prefix=PREFIX,
    inactivity_period=10,
)

Create an Amazon S3 object

To create a new (or replace) Amazon S3 object you can use S3CreateObjectOperator.

airflow/providers/amazon/aws/example_dags/example_s3.py[source]

create_object = S3CreateObjectOperator(
    task_id="s3_create_object",
    s3_bucket=BUCKET_NAME,
    s3_key=KEY,
    data=DATA,
    replace=True,
)

List Amazon S3 prefixes

To list all Amazon S3 prefixes within an Amazon S3 bucket you can use S3ListPrefixesOperator. See here for more information about Amazon S3 prefixes.

airflow/providers/amazon/aws/example_dags/example_s3.py[source]

list_prefixes = S3ListPrefixesOperator(
    task_id="s3_list_prefix_operator",
    bucket=BUCKET_NAME,
    prefix=PREFIX,
    delimiter=DELIMITER,
)

List Amazon S3 objects

To list all Amazon S3 objects within an Amazon S3 bucket you can use S3ListOperator. You can specify a prefix to filter the objects whose name begins with such prefix.

airflow/providers/amazon/aws/example_dags/example_s3.py[source]

list_keys = S3ListOperator(
    task_id="s3_list_operator",
    bucket=BUCKET_NAME,
    prefix=PREFIX,
)

Copy an Amazon S3 object

To copy an Amazon S3 object from one bucket to another you can use S3CopyObjectOperator. The Amazon S3 connection used here needs to have access to both source and destination bucket/key.

airflow/providers/amazon/aws/example_dags/example_s3.py[source]

copy_object = S3CopyObjectOperator(
    task_id="s3_copy_object",
    source_bucket_name=BUCKET_NAME,
    dest_bucket_name=BUCKET_NAME_2,
    source_bucket_key=KEY,
    dest_bucket_key=KEY_2,
)

Delete Amazon S3 objects

To delete one or multiple Amazon S3 objects you can use S3DeleteObjectsOperator.

airflow/providers/amazon/aws/example_dags/example_s3.py[source]

delete_objects = S3DeleteObjectsOperator(
    task_id="s3_delete_objects",
    bucket=BUCKET_NAME_2,
    keys=KEY_2,
)

Transform an Amazon S3 object

To transform the data from one Amazon S3 object and save it to another object you can use S3FileTransformOperator. You can also apply an optional [Amazon S3 Select expression](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-glacier-select-sql-reference-select.html) to select the data you want to retrieve from source_s3_key using select_expression.

airflow/providers/amazon/aws/example_dags/example_s3.py[source]

transforms_file = S3FileTransformOperator(
    task_id="s3_file_transform",
    source_s3_key=f's3://{BUCKET_NAME}/{KEY}',
    dest_s3_key=f's3://{BUCKET_NAME_2}/{KEY_2}',
    # Use `cp` command as transform script as an example
    transform_script='cp',
    replace=True,
)

Reference

For further information, look at:

Was this entry helpful?