Google Cloud PubSub Operators

Google Cloud PubSub is a fully-managed real-time messaging service that allows you to send and receive messages between independent applications. You can leverage Cloud Pub/Sub’s flexibility to decouple systems and components hosted on Google Cloud or elsewhere on the Internet.

Publisher applications can send messages to a topic and other applications can subscribe to that topic to receive the messages. By decoupling senders and receivers Google Cloud PubSub allows developers to communicate between independently written applications.

Creating a PubSub topic

The PubSub topic is a named resource to which messages are sent by publishers. The PubSubCreateTopicOperator operator creates a topic.

airflow/providers/google/cloud/example_dags/example_pubsub.pyView Source

    create_topic = PubSubCreateTopicOperator(
        task_id="create_topic", topic=TOPIC_FOR_SENSOR_DAG, project_id=GCP_PROJECT_ID, fail_if_exists=False
    )

Creating a PubSub subscription

A Subscription is a named resource representing the stream of messages from a single, specific topic, to be delivered to the subscribing application. The PubSubCreateSubscriptionOperator operator creates the subscription.

airflow/providers/google/cloud/example_dags/example_pubsub.pyView Source

    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task", project_id=GCP_PROJECT_ID, topic=TOPIC_FOR_SENSOR_DAG
    )

Publishing PubSub messages

A Message is a combination of data and (optional) attributes that a publisher sends to a topic and is eventually delivered to subscribers. The PubSubPublishMessageOperator operator would publish messages.

airflow/providers/google/cloud/example_dags/example_pubsub.pyView Source

    publish_task = PubSubPublishMessageOperator(
        task_id="publish_task",
        project_id=GCP_PROJECT_ID,
        topic=TOPIC_FOR_SENSOR_DAG,
        messages=[MESSAGE] * 10,
    )

Pulling messages from a PubSub subscription

The PubSubPullSensor sensor pulls messages from a PubSub subscription and pass them through XCom.

airflow/providers/google/cloud/example_dags/example_pubsub.pyView Source

    subscription = "{{ task_instance.xcom_pull('subscribe_task') }}"

    pull_messages = PubSubPullSensor(
        task_id="pull_messages",
        ack_messages=True,
        project_id=GCP_PROJECT_ID,
        subscription=subscription,
    )

airflow/providers/google/cloud/example_dags/example_pubsub.pyView Source

    subscription = "{{ task_instance.xcom_pull('subscribe_task') }}"

    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages",
        ack_messages=True,
        project_id=GCP_PROJECT_ID,
        subscription=subscription,
    )

To pull messages from XCom use the BashOperator.

airflow/providers/google/cloud/example_dags/example_pubsub.pyView Source

echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
    echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
{% endfor %}
"""

airflow/providers/google/cloud/example_dags/example_pubsub.pyView Source

    pull_messages_result = BashOperator(task_id="pull_messages_result", bash_command=echo_cmd)

Deleting a PubSub subscription

The PubSubDeleteSubscriptionOperator operator deletes the subscription.

airflow/providers/google/cloud/example_dags/example_pubsub.pyView Source

    unsubscribe_task = PubSubDeleteSubscriptionOperator(
        task_id="unsubscribe_task",
        project_id=GCP_PROJECT_ID,
        subscription="{{ task_instance.xcom_pull('subscribe_task') }}",
    )

Deleting a PubSub topic

The PubSubDeleteTopicOperator operator deletes topic.

airflow/providers/google/cloud/example_dags/example_pubsub.pyView Source

    delete_topic = PubSubDeleteTopicOperator(
        task_id="delete_topic", topic=TOPIC_FOR_SENSOR_DAG, project_id=GCP_PROJECT_ID
    )

Was this entry helpful?