Google Cloud PubSub Operators

Google Cloud PubSub is a fully-managed real-time messaging service that allows you to send and receive messages between independent applications. You can leverage Cloud Pub/Sub’s flexibility to decouple systems and components hosted on Google Cloud or elsewhere on the Internet.

Publisher applications can send messages to a topic and other applications can subscribe to that topic to receive the messages. By decoupling senders and receivers Google Cloud PubSub allows developers to communicate between independently written applications.

Creating a PubSub topic

The PubSub topic is a named resource to which messages are sent by publishers. The PubSubCreateTopicOperator operator creates a topic.

    create_topic = PubSubCreateTopicOperator(
        task_id="create_topic", topic=TOPIC_FOR_SENSOR_DAG, project_id=GCP_PROJECT_ID, fail_if_exists=False

Creating a PubSub subscription

A Subscription is a named resource representing the stream of messages from a single, specific topic, to be delivered to the subscribing application. The PubSubCreateSubscriptionOperator operator creates the subscription.

    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task", project_id=GCP_PROJECT_ID, topic=TOPIC_FOR_SENSOR_DAG

Publishing PubSub messages

A Message is a combination of data and (optional) attributes that a publisher sends to a topic and is eventually delivered to subscribers. The PubSubPublishMessageOperator operator would publish messages.

    publish_task = PubSubPublishMessageOperator(
        messages=[MESSAGE] * 10,

Pulling messages from a PubSub subscription

The PubSubPullSensor sensor pulls messages from a PubSub subscription and pass them through XCom.

    subscription = subscribe_task.output

    pull_messages = PubSubPullSensor(

    pull_messages_operator = PubSubPullOperator(

To pull messages from XCom use the BashOperator.

echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
    echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
{% endfor %}

    pull_messages_result = BashOperator(task_id="pull_messages_result", bash_command=echo_cmd)

Deleting a PubSub subscription

The PubSubDeleteSubscriptionOperator operator deletes the subscription.

    unsubscribe_task = PubSubDeleteSubscriptionOperator(

Deleting a PubSub topic

The PubSubDeleteTopicOperator operator deletes topic.

    delete_topic = PubSubDeleteTopicOperator(
        task_id="delete_topic", topic=TOPIC_FOR_SENSOR_DAG, project_id=GCP_PROJECT_ID


