Amazon S3 Operators¶
Airflow to Amazon Simple Storage Service (S3) integration provides several operators to create and interact with S3 buckets.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'
Detailed information is available Installation
Create an Amazon S3 Bucket¶
To create an Amazon S3 bucket you can use
S3CreateBucketOperator
.
create_bucket = S3CreateBucketOperator(
task_id='s3_create_bucket',
bucket_name=BUCKET_NAME,
)
Delete an Amazon S3 Bucket¶
To delete an Amazon S3 bucket you can use
S3DeleteBucketOperator
.
delete_bucket = S3DeleteBucketOperator(
task_id='s3_delete_bucket', bucket_name=BUCKET_NAME, force_delete=True
)
Set the tags for an Amazon S3 Bucket¶
To set the tags for an Amazon S3 bucket you can use
S3PutBucketTaggingOperator
.
put_tagging = S3PutBucketTaggingOperator(
task_id='s3_put_bucket_tagging',
bucket_name=BUCKET_NAME,
key=TAG_KEY,
value=TAG_VALUE,
)
Get the tag of an Amazon S3 Bucket¶
To get the tag set associated with an Amazon S3 bucket you can use
S3GetBucketTaggingOperator
.
get_tagging = S3GetBucketTaggingOperator(
task_id='s3_get_bucket_tagging',
bucket_name=BUCKET_NAME,
)
Delete the tags of an Amazon S3 Bucket¶
To delete the tags of an Amazon S3 bucket you can use
S3DeleteBucketTaggingOperator
.
delete_tagging = S3DeleteBucketTaggingOperator(
task_id='s3_delete_bucket_tagging',
bucket_name=BUCKET_NAME,
)
Amazon S3 Key Sensor¶
To wait for one or multiple keys to be present in an Amazon S3 bucket you can use
S3KeySensor
.
For each key, it calls
head_object
API (or list_objects_v2
API if wildcard_match
is True
) to check whether it is present or not.
Please keep in mind, especially when used to check a large volume of keys, that it makes one API call per key.
To check one file:
# Check if a file exists
sensor_one_key = S3KeySensor(
task_id="s3_sensor_one_key",
bucket_name=BUCKET_NAME,
bucket_key=KEY,
)
To check multiple files:
# Check if both files exist
sensor_two_keys = S3KeySensor(
task_id="s3_sensor_two_keys",
bucket_name=BUCKET_NAME,
bucket_key=[KEY, KEY_2],
)
To check with an additional custom check you can define a function which receives a list of matched S3 object attributes and returns a boolean:
True
: a certain criteria is metFalse
: the criteria isn't met
This function is called for each key passed as parameter in bucket_key
.
The reason why the parameter of this function is a list of objects is when wildcard_match
is True
,
multiple files can match one key. The list of matched S3 object attributes contain only the size and is this format:
[{"Size": int}]
def check_fn(files: List) -> bool:
"""
Example of custom check: check if all files are bigger than 1kB
:param files: List of S3 object attributes.
Format: [{
'Size': int
}]
:return: true if the criteria is met
:rtype: bool
"""
return all(f.get('Size', 0) > 1024 for f in files)
# Check if a file exists and match a certain pattern defined in check_fn
sensor_key_with_function = S3KeySensor(
task_id="s3_sensor_key_function",
bucket_name=BUCKET_NAME,
bucket_key=KEY,
check_fn=check_fn,
)
Amazon S3 Key Unchanged Sensor¶
To check for changes in the number of objects at a specific prefix in an Amazon S3 bucket and waits until
the inactivity period has passed with no increase in the number of objects you can use
S3KeysUnchangedSensor
.
Note, this sensor will not behave correctly in reschedule mode,
as the state of the listed objects in the Amazon S3 bucket will be lost between rescheduled invocations.
sensor_keys_unchanged = S3KeysUnchangedSensor(
task_id="s3_sensor_one_key_size",
bucket_name=BUCKET_NAME_2,
prefix=PREFIX,
inactivity_period=10,
)
Create an Amazon S3 object¶
To create a new (or replace) Amazon S3 object you can use
S3CreateObjectOperator
.
create_object = S3CreateObjectOperator(
task_id="s3_create_object",
s3_bucket=BUCKET_NAME,
s3_key=KEY,
data=DATA,
replace=True,
)
List Amazon S3 prefixes¶
To list all Amazon S3 prefixes within an Amazon S3 bucket you can use
S3ListPrefixesOperator
.
See here
for more information about Amazon S3 prefixes.
list_prefixes = S3ListPrefixesOperator(
task_id="s3_list_prefix_operator",
bucket=BUCKET_NAME,
prefix=PREFIX,
delimiter=DELIMITER,
)
List Amazon S3 objects¶
To list all Amazon S3 objects within an Amazon S3 bucket you can use
S3ListOperator
.
You can specify a prefix
to filter the objects whose name begins with such prefix.
list_keys = S3ListOperator(
task_id="s3_list_operator",
bucket=BUCKET_NAME,
prefix=PREFIX,
)
Copy an Amazon S3 object¶
To copy an Amazon S3 object from one bucket to another you can use
S3CopyObjectOperator
.
The Amazon S3 connection used here needs to have access to both source and destination bucket/key.
copy_object = S3CopyObjectOperator(
task_id="s3_copy_object",
source_bucket_name=BUCKET_NAME,
dest_bucket_name=BUCKET_NAME_2,
source_bucket_key=KEY,
dest_bucket_key=KEY_2,
)
Delete Amazon S3 objects¶
To delete one or multiple Amazon S3 objects you can use
S3DeleteObjectsOperator
.
delete_objects = S3DeleteObjectsOperator(
task_id="s3_delete_objects",
bucket=BUCKET_NAME_2,
keys=KEY_2,
)
Transform an Amazon S3 object¶
To transform the data from one Amazon S3 object and save it to another object you can use
S3FileTransformOperator
.
You can also apply an optional [Amazon S3 Select expression](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-glacier-select-sql-reference-select.html)
to select the data you want to retrieve from source_s3_key
using select_expression
.
transforms_file = S3FileTransformOperator(
task_id="s3_file_transform",
source_s3_key=f's3://{BUCKET_NAME}/{KEY}',
dest_s3_key=f's3://{BUCKET_NAME_2}/{KEY_2}',
# Use `cp` command as transform script as an example
transform_script='cp',
replace=True,
)