Google Cloud PubSub Operators¶
Google Cloud PubSub is a fully-managed real-time messaging service that allows you to send and receive messages between independent applications. You can leverage Cloud Pub/Sub’s flexibility to decouple systems and components hosted on Google Cloud or elsewhere on the Internet.
Publisher applications can send messages to a topic and other applications can subscribe to that topic to receive the messages. By decoupling senders and receivers Google Cloud PubSub allows developers to communicate between independently written applications.
Creating a PubSub topic¶
The PubSub topic is a named resource to which messages are sent by publishers.
The PubSubCreateTopicOperator
operator creates a topic.
create_topic = PubSubCreateTopicOperator(
task_id="create_topic", topic=TOPIC_FOR_SENSOR_DAG, project_id=GCP_PROJECT_ID, fail_if_exists=False
)
Creating a PubSub subscription¶
A Subscription
is a named resource representing the stream of messages from a single, specific topic,
to be delivered to the subscribing application.
The PubSubCreateSubscriptionOperator
operator creates the subscription.
subscribe_task = PubSubCreateSubscriptionOperator(
task_id="subscribe_task", project_id=GCP_PROJECT_ID, topic=TOPIC_FOR_SENSOR_DAG
)
Publishing PubSub messages¶
A Message
is a combination of data and (optional) attributes that a publisher sends to a topic and is eventually delivered to subscribers.
The PubSubPublishMessageOperator
operator would publish messages.
publish_task = PubSubPublishMessageOperator(
task_id="publish_task",
project_id=GCP_PROJECT_ID,
topic=TOPIC_FOR_SENSOR_DAG,
messages=[MESSAGE] * 10,
)
Pulling messages from a PubSub subscription¶
The PubSubPullSensor
sensor pulls messages from a PubSub subscription
and pass them through XCom.
subscription = "{{ task_instance.xcom_pull('subscribe_task') }}"
pull_messages = PubSubPullSensor(
task_id="pull_messages",
ack_messages=True,
project_id=GCP_PROJECT_ID,
subscription=subscription,
)
subscription = "{{ task_instance.xcom_pull('subscribe_task') }}"
pull_messages_operator = PubSubPullOperator(
task_id="pull_messages",
ack_messages=True,
project_id=GCP_PROJECT_ID,
subscription=subscription,
)
To pull messages from XCom use the BashOperator
.
echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
{% endfor %}
"""
pull_messages_result = BashOperator(task_id="pull_messages_result", bash_command=echo_cmd)
Deleting a PubSub subscription¶
The PubSubDeleteSubscriptionOperator
operator deletes the subscription.
unsubscribe_task = PubSubDeleteSubscriptionOperator(
task_id="unsubscribe_task",
project_id=GCP_PROJECT_ID,
subscription="{{ task_instance.xcom_pull('subscribe_task') }}",
)
Deleting a PubSub topic¶
The PubSubDeleteTopicOperator
operator deletes topic.
delete_topic = PubSubDeleteTopicOperator(
task_id="delete_topic", topic=TOPIC_FOR_SENSOR_DAG, project_id=GCP_PROJECT_ID
)