Google Cloud PubSub Operators¶
Google Cloud PubSub is a fully-managed real-time messaging service that allows you to send and receive messages between independent applications. You can leverage Cloud Pub/Sub’s flexibility to decouple systems and components hosted on Google Cloud or elsewhere on the Internet.
Publisher applications can send messages to a topic and other applications can subscribe to that topic to receive the messages. By decoupling senders and receivers Google Cloud PubSub allows developers to communicate between independently written applications.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Creating a PubSub topic¶
The PubSub topic is a named resource to which messages are sent by publishers.
The PubSubCreateTopicOperator operator creates a topic.
tests/system/google/cloud/pubsub/example_pubsub.py
    create_topic = PubSubCreateTopicOperator(
        task_id="create_topic", topic=TOPIC_ID, project_id=PROJECT_ID, fail_if_exists=False
    )
Creating a PubSub subscription¶
A Subscription is a named resource representing the stream of messages from a single, specific topic,
to be delivered to the subscribing application.
The PubSubCreateSubscriptionOperator operator creates the subscription.
tests/system/google/cloud/pubsub/example_pubsub.py
    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task", project_id=PROJECT_ID, topic=TOPIC_ID
    )
Publishing PubSub messages¶
A Message is a combination of data and (optional) attributes that a publisher sends to a topic and is eventually delivered to subscribers.
The PubSubPublishMessageOperator operator would publish messages.
tests/system/google/cloud/pubsub/example_pubsub.py
    publish_task = PubSubPublishMessageOperator(
        task_id="publish_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        messages=[MESSAGE, MESSAGE],
    )
Pulling messages from a PubSub subscription¶
The PubSubPullSensor sensor pulls messages from a PubSub subscription
and pass them through XCom.
tests/system/google/cloud/pubsub/example_pubsub.py
    subscription = subscribe_task.output
    pull_messages = PubSubPullSensor(
        task_id="pull_messages",
        ack_messages=True,
        project_id=PROJECT_ID,
        subscription=subscription,
    )
Also for this action you can use sensor in the deferrable mode:
tests/system/google/cloud/pubsub/example_pubsub_deferrable.py
pull_messages_async = PubSubPullSensor(
    task_id="pull_messages_async",
    ack_messages=True,
    project_id=PROJECT_ID,
    subscription=subscription,
    deferrable=True,
)
tests/system/google/cloud/pubsub/example_pubsub.py
    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        ack_messages=True,
        project_id=PROJECT_ID,
        subscription=subscription,
    )
To pull messages from XCom use the BashOperator.
tests/system/google/cloud/pubsub/example_pubsub.py
echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
    echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
{% endfor %}
"""
tests/system/google/cloud/pubsub/example_pubsub.py
    pull_messages_result = BashOperator(task_id="pull_messages_result", bash_command=echo_cmd)
Deleting a PubSub subscription¶
The PubSubDeleteSubscriptionOperator operator deletes the subscription.
tests/system/google/cloud/pubsub/example_pubsub.py
    unsubscribe_task = PubSubDeleteSubscriptionOperator(
        task_id="unsubscribe_task",
        project_id=PROJECT_ID,
        subscription=subscription,
    )
Deleting a PubSub topic¶
The PubSubDeleteTopicOperator operator deletes topic.
tests/system/google/cloud/pubsub/example_pubsub.py
    delete_topic = PubSubDeleteTopicOperator(task_id="delete_topic", topic=TOPIC_ID, project_id=PROJECT_ID)
Reference¶
For further information, look at: