Google Cloud PubSub Operators

Google Cloud PubSub is a fully-managed real-time messaging service that allows you to send and receive messages between independent applications. You can leverage Cloud Pub/Sub’s flexibility to decouple systems and components hosted on Google Cloud or elsewhere on the Internet.

Publisher applications can send messages to a topic and other applications can subscribe to that topic to receive the messages. By decoupling senders and receivers Google Cloud PubSub allows developers to communicate between independently written applications.

Prerequisite Tasks

Creating a PubSub topic

The PubSub topic is a named resource to which messages are sent by publishers. The PubSubCreateTopicOperator operator creates a topic.

airflow/providers/google/cloud/example_dags/example_pubsub.py[source]

    create_topic1 = PubSubCreateTopicOperator(
        task_id="create_topic", topic=TOPIC_FOR_SENSOR_DAG, project_id=GCP_PROJECT_ID, fail_if_exists=False
    )

Creating a PubSub subscription

A Subscription is a named resource representing the stream of messages from a single, specific topic, to be delivered to the subscribing application. The PubSubCreateSubscriptionOperator operator creates the subscription.

airflow/providers/google/cloud/example_dags/example_pubsub.py[source]

    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task", project_id=GCP_PROJECT_ID, topic=TOPIC_FOR_SENSOR_DAG
    )

Publishing PubSub messages

A Message is a combination of data and (optional) attributes that a publisher sends to a topic and is eventually delivered to subscribers. The PubSubPublishMessageOperator operator would publish messages.

airflow/providers/google/cloud/example_dags/example_pubsub.py[source]

    publish_task = PubSubPublishMessageOperator(
        task_id="publish_task",
        project_id=GCP_PROJECT_ID,
        topic=TOPIC_FOR_SENSOR_DAG,
        messages=[MESSAGE] * 10,
    )

Pulling messages from a PubSub subscription

The PubSubPullSensor sensor pulls messages from a PubSub subscription and pass them through XCom.

airflow/providers/google/cloud/example_dags/example_pubsub.py[source]

    subscription = subscribe_task.output

    pull_messages = PubSubPullSensor(
        task_id="pull_messages",
        ack_messages=True,
        project_id=GCP_PROJECT_ID,
        subscription=subscription,
    )

airflow/providers/google/cloud/example_dags/example_pubsub.py[source]

    subscription = subscribe_task.output

    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages",
        ack_messages=True,
        project_id=GCP_PROJECT_ID,
        subscription=subscription,
    )

To pull messages from XCom use the BashOperator.

airflow/providers/google/cloud/example_dags/example_pubsub.py[source]

echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
    echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
{% endfor %}
"""

airflow/providers/google/cloud/example_dags/example_pubsub.py[source]

    pull_messages_result = BashOperator(task_id="pull_messages_result", bash_command=echo_cmd)

Deleting a PubSub subscription

The PubSubDeleteSubscriptionOperator operator deletes the subscription.

airflow/providers/google/cloud/example_dags/example_pubsub.py[source]

    unsubscribe_task = PubSubDeleteSubscriptionOperator(
        task_id="unsubscribe_task",
        project_id=GCP_PROJECT_ID,
        subscription=subscription,
    )

Deleting a PubSub topic

The PubSubDeleteTopicOperator operator deletes topic.

airflow/providers/google/cloud/example_dags/example_pubsub.py[source]

    delete_topic = PubSubDeleteTopicOperator(
        task_id="delete_topic", topic=TOPIC_FOR_SENSOR_DAG, project_id=GCP_PROJECT_ID
    )

Reference

For further information, look at:

Was this entry helpful?