SQS Publish Operator¶
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'
Detailed information is available Installation
Using Operator¶
Use the
SQSPublishOperator
to publish a message to Amazon Simple Queue Service (SQS).
In the following example, the task “publish_to_queue” publishes a message containing
the task instance and the execution date to a queue named Airflow-Example-Queue
.
# Using a task-decorated function to create an SQS queue
create_queue = create_queue_fn()
publish_to_queue = SQSPublishOperator(
task_id='publish_to_queue',
sqs_queue=create_queue,
message_content="{{ task_instance }}-{{ execution_date }}",
message_attributes=None,
delay_seconds=0,
aws_conn_id=AWS_CONN_ID,
)
read_from_queue = SQSSensor(
task_id='read_from_queue',
sqs_queue=create_queue,
max_messages=5,
wait_time_seconds=1,
visibility_timeout=None,
message_filtering=None,
message_filtering_match_values=None,
message_filtering_config=None,
aws_conn_id=AWS_CONN_ID,
)
# Using a task-decorated function to delete the SQS queue we created earlier
delete_queue = delete_queue_fn(create_queue)
create_queue >> publish_to_queue >> read_from_queue >> delete_queue
More information¶
For further information, look at the documentation of send_message()
method
in boto3.