airflow.contrib.operators.pubsub_operator

Module Contents

class airflow.contrib.operators.pubsub_operator.PubSubTopicCreateOperator(project, topic, fail_if_exists=False, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Create a PubSub topic.

By default, if the topic already exists, this operator will not cause the DAG to fail.

with DAG('successful DAG') as dag:
    (
        dag
        >> PubSubTopicCreateOperator(project='my-project',
                                     topic='my_new_topic')
        >> PubSubTopicCreateOperator(project='my-project',
                                     topic='my_new_topic')
    )

The operator can be configured to fail if the topic already exists.

with DAG('failing DAG') as dag:
    (
        dag
        >> PubSubTopicCreateOperator(project='my-project',
                                     topic='my_new_topic')
        >> PubSubTopicCreateOperator(project='my-project',
                                     topic='my_new_topic',
                                     fail_if_exists=True)
    )

Both project and topic are templated so you can use variables in them.

template_fields = ['project', 'topic'][source]
ui_color = #0273d4[source]
execute(self, context)[source]
class airflow.contrib.operators.pubsub_operator.PubSubSubscriptionCreateOperator(topic_project, topic, subscription=None, subscription_project=None, ack_deadline_secs=10, fail_if_exists=False, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Create a PubSub subscription.

By default, the subscription will be created in topic_project. If subscription_project is specified and the GCP credentials allow, the Subscription can be created in a different project from its topic.

By default, if the subscription already exists, this operator will not cause the DAG to fail. However, the topic must exist in the project.

with DAG('successful DAG') as dag:
    (
        dag
        >> PubSubSubscriptionCreateOperator(
            topic_project='my-project', topic='my-topic',
            subscription='my-subscription')
        >> PubSubSubscriptionCreateOperator(
            topic_project='my-project', topic='my-topic',
            subscription='my-subscription')
    )

The operator can be configured to fail if the subscription already exists.

with DAG('failing DAG') as dag:
    (
        dag
        >> PubSubSubscriptionCreateOperator(
            topic_project='my-project', topic='my-topic',
            subscription='my-subscription')
        >> PubSubSubscriptionCreateOperator(
            topic_project='my-project', topic='my-topic',
            subscription='my-subscription', fail_if_exists=True)
    )

Finally, subscription is not required. If not passed, the operator will generated a universally unique identifier for the subscription’s name.

with DAG('DAG') as dag:
    (
        dag >> PubSubSubscriptionCreateOperator(
            topic_project='my-project', topic='my-topic')
    )

topic_project, topic, subscription, and subscription are templated so you can use variables in them.

template_fields = ['topic_project', 'topic', 'subscription', 'subscription_project'][source]
ui_color = #0273d4[source]
execute(self, context)[source]
class airflow.contrib.operators.pubsub_operator.PubSubTopicDeleteOperator(project, topic, fail_if_not_exists=False, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Delete a PubSub topic.

By default, if the topic does not exist, this operator will not cause the DAG to fail.

with DAG('successful DAG') as dag:
    (
        dag
        >> PubSubTopicDeleteOperator(project='my-project',
                                     topic='non_existing_topic')
    )

The operator can be configured to fail if the topic does not exist.

with DAG('failing DAG') as dag:
    (
        dag
        >> PubSubTopicCreateOperator(project='my-project',
                                     topic='non_existing_topic',
                                     fail_if_not_exists=True)
    )

Both project and topic are templated so you can use variables in them.

template_fields = ['project', 'topic'][source]
ui_color = #cb4335[source]
execute(self, context)[source]
class airflow.contrib.operators.pubsub_operator.PubSubSubscriptionDeleteOperator(project, subscription, fail_if_not_exists=False, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Delete a PubSub subscription.

By default, if the subscription does not exist, this operator will not cause the DAG to fail.

with DAG('successful DAG') as dag:
    (
        dag
        >> PubSubSubscriptionDeleteOperator(project='my-project',
                                            subscription='non-existing')
    )

The operator can be configured to fail if the subscription already exists.

with DAG('failing DAG') as dag:
    (
        dag
        >> PubSubSubscriptionDeleteOperator(
             project='my-project', subscription='non-existing',
             fail_if_not_exists=True)
    )

project, and subscription are templated so you can use variables in them.

template_fields = ['project', 'subscription'][source]
ui_color = #cb4335[source]
execute(self, context)[source]
class airflow.contrib.operators.pubsub_operator.PubSubPublishOperator(project, topic, messages, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Publish messages to a PubSub topic.

Each Task publishes all provided messages to the same topic in a single GCP project. If the topic does not exist, this task will fail.

from base64 import b64encode as b64e

m1 = {'data': b64e('Hello, World!'),
      'attributes': {'type': 'greeting'}
     }
m2 = {'data': b64e('Knock, knock')}
m3 = {'attributes': {'foo': ''}}

t1 = PubSubPublishOperator(
    project='my-project',topic='my_topic',
    messages=[m1, m2, m3],
    create_topic=True,
    dag=dag)

project , topic, and messages are templated so you can use variables in them.

template_fields = ['project', 'topic', 'messages'][source]
ui_color = #0273d4[source]
execute(self, context)[source]

Was this entry helpful?