Google Cloud PubSub Operators

Google Cloud PubSub is a fully-managed real-time messaging service that allows you to send and receive messages between independent applications. You can leverage Cloud Pub/Sub’s flexibility to decouple systems and components hosted on Google Cloud or elsewhere on the Internet.

Publisher applications can send messages to a topic and other applications can subscribe to that topic to receive the messages. By decoupling senders and receivers Google Cloud PubSub allows developers to communicate between independently written applications.

Prerequisite Tasks

Creating a PubSub topic

The PubSub topic is a named resource to which messages are sent by publishers. The PubSubCreateTopicOperator operator creates a topic.

airflow/providers/google/cloud/example_dags/example_pubsub.pyView Source

    create_topic = PubSubCreateTopicOperator(
        task_id="create_topic", topic=TOPIC_FOR_SENSOR_DAG, project_id=GCP_PROJECT_ID, fail_if_exists=False
    )

Creating a PubSub subscription

A Subscription is a named resource representing the stream of messages from a single, specific topic, to be delivered to the subscribing application. The PubSubCreateSubscriptionOperator operator creates the subscription.

airflow/providers/google/cloud/example_dags/example_pubsub.pyView Source

    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task", project_id=GCP_PROJECT_ID, topic=TOPIC_FOR_SENSOR_DAG
    )

Publishing PubSub messages

A Message is a combination of data and (optional) attributes that a publisher sends to a topic and is eventually delivered to subscribers. The PubSubPublishMessageOperator operator would publish messages.

airflow/providers/google/cloud/example_dags/example_pubsub.pyView Source

    publish_task = PubSubPublishMessageOperator(
        task_id="publish_task",
        project_id=GCP_PROJECT_ID,
        topic=TOPIC_FOR_SENSOR_DAG,
        messages=[MESSAGE] * 10,
    )

Pulling messages from a PubSub subscription

The PubSubPullSensor sensor pulls messages from a PubSub subscription and pass them through XCom.

airflow/providers/google/cloud/example_dags/example_pubsub.pyView Source

    subscription = subscribe_task.output

    pull_messages = PubSubPullSensor(
        task_id="pull_messages",
        ack_messages=True,
        project_id=GCP_PROJECT_ID,
        subscription=subscription,
    )

airflow/providers/google/cloud/example_dags/example_pubsub.pyView Source

    subscription = subscribe_task.output

    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages",
        ack_messages=True,
        project_id=GCP_PROJECT_ID,
        subscription=subscription,
    )

To pull messages from XCom use the BashOperator.

airflow/providers/google/cloud/example_dags/example_pubsub.pyView Source

echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
    echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
{% endfor %}
"""

airflow/providers/google/cloud/example_dags/example_pubsub.pyView Source

    pull_messages_result = BashOperator(task_id="pull_messages_result", bash_command=echo_cmd)

Deleting a PubSub subscription

The PubSubDeleteSubscriptionOperator operator deletes the subscription.

airflow/providers/google/cloud/example_dags/example_pubsub.pyView Source

    unsubscribe_task = PubSubDeleteSubscriptionOperator(
        task_id="unsubscribe_task",
        project_id=GCP_PROJECT_ID,
        subscription=subscription,
    )

Deleting a PubSub topic

The PubSubDeleteTopicOperator operator deletes topic.

airflow/providers/google/cloud/example_dags/example_pubsub.pyView Source

    delete_topic = PubSubDeleteTopicOperator(
        task_id="delete_topic", topic=TOPIC_FOR_SENSOR_DAG, project_id=GCP_PROJECT_ID
    )

Reference

For further information, look at:

Was this entry helpful?