Source code for tests.system.apache.kafka.example_dag_hello_kafka
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportfunctoolsimportjsonimportloggingfromdatetimeimportdatetime,timedeltafromairflowimportDAGfromairflow.providers.apache.kafka.operators.consumeimportConsumeFromTopicOperatorfromairflow.providers.apache.kafka.operators.produceimportProduceToTopicOperatorfromairflow.providers.apache.kafka.sensors.kafkaimportAwaitMessageSensor# This is just for setting up connections in the demo - you should use standard# methods for setting these connections in productionfromairflow.providers.standard.operators.pythonimportPythonOperator
[docs]defload_connections():# Connections needed for this example dag to finishfromairflow.modelsimportConnectionfromairflow.utilsimportdbdb.merge_conn(Connection(conn_id="t1-3",conn_type="kafka",extra=json.dumps({"socket.timeout.ms":10,"bootstrap.servers":"broker:29092"}),))db.merge_conn(Connection(conn_id="t2",conn_type="kafka",extra=json.dumps({"bootstrap.servers":"broker:29092","group.id":"t2","enable.auto.commit":False,"auto.offset.reset":"beginning",}),))db.merge_conn(Connection(conn_id="t4",conn_type="kafka",extra=json.dumps({"bootstrap.servers":"broker:29092","group.id":"t4","enable.auto.commit":False,"auto.offset.reset":"beginning",}),))db.merge_conn(Connection(conn_id="t4b",conn_type="kafka",extra=json.dumps({"bootstrap.servers":"broker:29092","group.id":"t4b","enable.auto.commit":False,"auto.offset.reset":"beginning",}),))db.merge_conn(Connection(conn_id="t5",conn_type="kafka",extra=json.dumps({"bootstrap.servers":"broker:29092","group.id":"t5","enable.auto.commit":False,"auto.offset.reset":"beginning",}),))
withDAG("kafka-example",default_args=default_args,description="Examples of Kafka Operators",schedule=timedelta(days=1),start_date=datetime(2021,1,1),catchup=False,tags=["example"],)asdag:
# [START howto_operator_produce_to_topic]t1=ProduceToTopicOperator(kafka_config_id="t1-3",task_id="produce_to_topic",topic="test_1",producer_function="example_dag_hello_kafka.producer_function",)# [END howto_operator_produce_to_topic]t1.doc_md="Takes a series of messages from a generator function and publishes""them to the `test_1` topic of our kafka cluster."# [START howto_operator_consume_from_topic]t2=ConsumeFromTopicOperator(kafka_config_id="t2",task_id="consume_from_topic",topics=["test_1"],apply_function="example_dag_hello_kafka.consumer_function",apply_function_kwargs={"prefix":"consumed:::"},commit_cadence="end_of_batch",max_messages=10,max_batch_size=2,)# [END howto_operator_consume_from_topic]t2.doc_md="Reads a series of messages from the `test_1` topic, and processes""them with a consumer function with a keyword argument."t3=ProduceToTopicOperator(kafka_config_id="t1-3",task_id="produce_to_topic_2",topic="test_1",producer_function=producer_function,)t3.doc_md="Does the same thing as the t1 task, but passes the callable directly""instead of using the string notation."t4=ConsumeFromTopicOperator(kafka_config_id="t4",task_id="consume_from_topic_2",topics=["test_1"],apply_function=functools.partial(consumer_function,prefix="consumed:::"),commit_cadence="end_of_batch",max_messages=30,max_batch_size=10,)t4b=ConsumeFromTopicOperator(kafka_config_id="t4b",task_id="consume_from_topic_2_b",topics=["test_1"],apply_function_batch=functools.partial(consumer_function_batch,prefix="consumed:::"),commit_cadence="end_of_batch",max_messages=30,max_batch_size=10,)t4.doc_md="Does the same thing as the t2 task, but passes the callable directly""instead of using the string notation."# [START howto_sensor_await_message]t5=AwaitMessageSensor(kafka_config_id="t5",task_id="awaiting_message",topics=["test_1"],apply_function="example_dag_hello_kafka.await_function",xcom_push_key="retrieved_message",)# [END howto_sensor_await_message]t5.doc_md="A deferrable task. Reads the topic `test_1` until a message with a value""divisible by 5 is encountered."t6=PythonOperator(task_id="hello_kafka",python_callable=hello_kafka)t6.doc_md="The task that is executed after the deferrable task returns for execution."t0>>t1>>t2t0>>t3>>[t4,t4b]>>t5>>t6fromtests_common.test_utils.system_testsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)