Google Cloud Managed Service for Apache Kafka Operators

The Google Cloud Managed Service for Apache Kafka helps you set up, secure, maintain, and scale Apache Kafka clusters.

Interacting with Apache Kafka Cluster

To create an Apache Kafka cluster you can use ManagedKafkaCreateClusterOperator.

google/tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py[source]

create_cluster = ManagedKafkaCreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster=CLUSTER_CONF,
    cluster_id=CLUSTER_ID,
)

To delete cluster you can use ManagedKafkaDeleteClusterOperator.

google/tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py[source]

delete_cluster = ManagedKafkaDeleteClusterOperator(
    task_id="delete_cluster",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

To get cluster you can use ManagedKafkaGetClusterOperator.

google/tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py[source]

get_cluster = ManagedKafkaGetClusterOperator(
    task_id="get_cluster",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
)

To get a list of clusters you can use ManagedKafkaListClustersOperator.

google/tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py[source]

list_clusters = ManagedKafkaListClustersOperator(
    task_id="list_clusters",
    project_id=PROJECT_ID,
    location=LOCATION,
)

To update cluster you can use ManagedKafkaUpdateClusterOperator.

google/tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py[source]

update_cluster = ManagedKafkaUpdateClusterOperator(
    task_id="update_cluster",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    cluster=CLUSTER_TO_UPDATE,
    update_mask=CLUSTER_UPDATE_MASK,
)

Interacting with Apache Kafka Topics

To create an Apache Kafka topic you can use ManagedKafkaCreateTopicOperator.

google/tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py[source]

create_topic = ManagedKafkaCreateTopicOperator(
    task_id="create_topic",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    topic_id=TOPIC_ID,
    topic=TOPIC_CONF,
)

To delete topic you can use ManagedKafkaDeleteTopicOperator.

google/tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py[source]

delete_topic = ManagedKafkaDeleteTopicOperator(
    task_id="delete_topic",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    topic_id=TOPIC_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

To get topic you can use ManagedKafkaGetTopicOperator.

google/tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py[source]

get_topic = ManagedKafkaGetTopicOperator(
    task_id="get_topic",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    topic_id=TOPIC_ID,
)

To get a list of topics you can use ManagedKafkaListTopicsOperator.

google/tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py[source]

list_topics = ManagedKafkaListTopicsOperator(
    task_id="list_topics",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
)

To update topic you can use ManagedKafkaUpdateTopicOperator.

google/tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py[source]

update_topic = ManagedKafkaUpdateTopicOperator(
    task_id="update_topic",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    topic_id=TOPIC_ID,
    topic=TOPIC_TO_UPDATE,
    update_mask=TOPIC_UPDATE_MASK,
)

Interacting with Apache Kafka Consumer Groups

To delete consumer group you can use ManagedKafkaDeleteConsumerGroupOperator.

google/tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py[source]

delete_consumer_group = ManagedKafkaDeleteConsumerGroupOperator(
    task_id="delete_consumer_group",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    consumer_group_id=CONSUMER_GROUP_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

To get consumer group you can use ManagedKafkaGetConsumerGroupOperator.

google/tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py[source]

get_consumer_group = ManagedKafkaGetConsumerGroupOperator(
    task_id="get_consumer_group",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    consumer_group_id=CONSUMER_GROUP_ID,
)

To get a list of consumer groups you can use ManagedKafkaListConsumerGroupsOperator.

google/tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py[source]

list_consumer_groups = ManagedKafkaListConsumerGroupsOperator(
    task_id="list_consumer_groups",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
)

To update consumer group you can use ManagedKafkaUpdateConsumerGroupOperator.

google/tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py[source]

update_consumer_group = ManagedKafkaUpdateConsumerGroupOperator(
    task_id="update_consumer_group",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    consumer_group_id=CONSUMER_GROUP_ID,
    consumer_group={},
    update_mask={},
)

Using Apache Kafka provider with Google Cloud Managed Service for Apache Kafka

To produce data to topic you can use ProduceToTopicOperator.

google/tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py[source]

produce_to_topic = ProduceToTopicOperator(
    task_id="produce_to_topic",
    kafka_config_id=CONNECTION_ID,
    topic=TOPIC_ID,
    producer_function=producer,
    poll_timeout=10,
)

To consume data from topic you can use ConsumeFromTopicOperator.

google/tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py[source]

consume_from_topic = ConsumeFromTopicOperator(
    task_id="consume_from_topic",
    kafka_config_id=CONNECTION_ID,
    topics=[TOPIC_ID],
    apply_function=consumer,
    poll_timeout=20,
    max_messages=20,
    max_batch_size=20,
)

Reference

For further information, look at:

Was this entry helpful?