Google Cloud PubSub Operators¶
Google Cloud PubSub is a fully-managed real-time messaging service that allows you to send and receive messages between independent applications. You can leverage Cloud Pub/Sub’s flexibility to decouple systems and components hosted on Google Cloud or elsewhere on the Internet.
Publisher applications can send messages to a topic and other applications can subscribe to that topic to receive the messages. By decoupling senders and receivers Google Cloud PubSub allows developers to communicate between independently written applications.
Prerequisite Tasks¶
Creating a PubSub topic¶
The PubSub topic is a named resource to which messages are sent by publishers.
The PubSubCreateTopicOperator
operator creates a topic.
create_topic1 = PubSubCreateTopicOperator(
task_id="create_topic", topic=TOPIC_FOR_SENSOR_DAG, project_id=GCP_PROJECT_ID, fail_if_exists=False
)
Creating a PubSub subscription¶
A Subscription
is a named resource representing the stream of messages from a single, specific topic,
to be delivered to the subscribing application.
The PubSubCreateSubscriptionOperator
operator creates the subscription.
subscribe_task = PubSubCreateSubscriptionOperator(
task_id="subscribe_task", project_id=GCP_PROJECT_ID, topic=TOPIC_FOR_SENSOR_DAG
)
Publishing PubSub messages¶
A Message
is a combination of data and (optional) attributes that a publisher sends to a topic and is eventually delivered to subscribers.
The PubSubPublishMessageOperator
operator would publish messages.
publish_task = PubSubPublishMessageOperator(
task_id="publish_task",
project_id=GCP_PROJECT_ID,
topic=TOPIC_FOR_SENSOR_DAG,
messages=[MESSAGE] * 10,
)
Pulling messages from a PubSub subscription¶
The PubSubPullSensor
sensor pulls messages from a PubSub subscription
and pass them through XCom.
subscription = subscribe_task.output
pull_messages = PubSubPullSensor(
task_id="pull_messages",
ack_messages=True,
project_id=GCP_PROJECT_ID,
subscription=subscription,
)
subscription = subscribe_task.output
pull_messages_operator = PubSubPullOperator(
task_id="pull_messages",
ack_messages=True,
project_id=GCP_PROJECT_ID,
subscription=subscription,
)
To pull messages from XCom use the BashOperator
.
echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
{% endfor %}
"""
pull_messages_result = BashOperator(task_id="pull_messages_result", bash_command=echo_cmd)
Deleting a PubSub subscription¶
The PubSubDeleteSubscriptionOperator
operator deletes the subscription.
unsubscribe_task = PubSubDeleteSubscriptionOperator(
task_id="unsubscribe_task",
project_id=GCP_PROJECT_ID,
subscription=subscription,
)
Deleting a PubSub topic¶
The PubSubDeleteTopicOperator
operator deletes topic.
delete_topic = PubSubDeleteTopicOperator(
task_id="delete_topic", topic=TOPIC_FOR_SENSOR_DAG, project_id=GCP_PROJECT_ID
)