Source code for airflow.providers.amazon.aws.example_dags.example_sqs

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task
from airflow.providers.amazon.aws.hooks.sqs import SQSHook
from airflow.providers.amazon.aws.operators.sqs import SQSPublishOperator
from airflow.providers.amazon.aws.sensors.sqs import SQSSensor

QUEUE_NAME = 'Airflow-Example-Queue'
AWS_CONN_ID = 'aws_default'


@task(task_id="create_queue")
def create_queue_fn():
    """This is a python callback that creates an SQS queue"""
    hook = SQSHook(aws_conn_id=AWS_CONN_ID)
    result = hook.create_queue(
        queue_name=QUEUE_NAME,
    )
    return result['QueueUrl']


@task(task_id="delete_queue")
def delete_queue_fn(queue_url):
    """This is a python callback that deletes an SQS queue"""
    hook = SQSHook(aws_conn_id=AWS_CONN_ID)
    hook.get_conn().delete_queue(QueueUrl=queue_url)


with DAG(
    dag_id='example_sqs',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    dagrun_timeout=timedelta(minutes=60),
    tags=['example'],
    catchup=False,
) as dag:
    # [START howto_sqs_operator_and_sensor]

    # Using a task-decorated function to create an SQS queue
    create_queue = create_queue_fn()

    publish_to_queue = SQSPublishOperator(
        task_id='publish_to_queue',
        sqs_queue=create_queue,
        message_content="{{ task_instance }}-{{ execution_date }}",
        message_attributes=None,
        delay_seconds=0,
        aws_conn_id=AWS_CONN_ID,
    )

    read_from_queue = SQSSensor(
        task_id='read_from_queue',
        sqs_queue=create_queue,
        max_messages=5,
        wait_time_seconds=1,
        visibility_timeout=None,
        message_filtering=None,
        message_filtering_match_values=None,
        message_filtering_config=None,
        aws_conn_id=AWS_CONN_ID,
    )

    # Using a task-decorated function to delete the SQS queue we created earlier
    delete_queue = delete_queue_fn(create_queue)

    create_queue >> publish_to_queue >> read_from_queue >> delete_queue
    # [END howto_sqs_operator_and_sensor]

Was this entry helpful?