Source code for tests.system.providers.apache.kafka.example_dag_hello_kafka
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportfunctoolsimportjsonimportloggingfromdatetimeimportdatetime,timedeltafromairflowimportDAG# This is just for setting up connections in the demo - you should use standard# methods for setting these connections in productionfromairflow.operators.pythonimportPythonOperatorfromairflow.providers.apache.kafka.operators.consumeimportConsumeFromTopicOperatorfromairflow.providers.apache.kafka.operators.produceimportProduceToTopicOperatorfromairflow.providers.apache.kafka.sensors.kafkaimportAwaitMessageSensor# Connections needed for this example dag to finish# from airflow.models import Connection# from airflow.utils import db# db.merge_conn(# Connection(# conn_id="t1-3",# conn_type="kafka",# extra=json.dumps({"socket.timeout.ms": 10, "bootstrap.servers": "broker:29092"}),# )# )# db.merge_conn(# Connection(# conn_id="t2",# conn_type="kafka",# extra=json.dumps(# {# "bootstrap.servers": "broker:29092",# "group.id": "t2",# "enable.auto.commit": False,# "auto.offset.reset": "beginning",# }# ),# )# )# db.merge_conn(# Connection(# conn_id="t4",# conn_type="kafka",# extra=json.dumps(# {# "bootstrap.servers": "broker:29092",# "group.id": "t4",# "enable.auto.commit": False,# "auto.offset.reset": "beginning",# }# ),# )# )# db.merge_conn(# Connection(# conn_id="t4b",# conn_type="kafka",# extra=json.dumps(# {# "bootstrap.servers": "broker:29092",# "group.id": "t4b",# "enable.auto.commit": False,# "auto.offset.reset": "beginning",# }# ),# )# )# db.merge_conn(# Connection(# conn_id="t5",# conn_type="kafka",# extra=json.dumps(# {# "bootstrap.servers": "broker:29092",# "group.id": "t5",# "enable.auto.commit": False,# "auto.offset.reset": "beginning",# }# ),# )# )
)# [END howto_operator_produce_to_topic]t1.doc_md="Takes a series of messages from a generator function and publishes""them to the `test_1` topic of our kafka cluster."# [START howto_operator_consume_from_topic]t2=ConsumeFromTopicOperator(kafka_config_id="t2",task_id="consume_from_topic",topics=["test_1"],apply_function="hello_kafka.consumer_function",apply_function_kwargs={"prefix":"consumed:::"},commit_cadence="end_of_batch",max_messages=10,max_batch_size=2,)# [END howto_operator_consume_from_topic]t2.doc_md="Reads a series of messages from the `test_1` topic, and processes""them with a consumer function with a keyword argument."t3=ProduceToTopicOperator(kafka_config_id="t1-3",task_id="produce_to_topic_2",topic="test_1",producer_function=producer_function,)t3.doc_md="Does the same thing as the t1 task, but passes the callable directly""instead of using the string notation."t4=ConsumeFromTopicOperator(kafka_config_id="t4",task_id="consume_from_topic_2",topics=["test_1"],apply_function=functools.partial(consumer_function,prefix="consumed:::"),commit_cadence="end_of_batch",max_messages=30,max_batch_size=10,)t4b=ConsumeFromTopicOperator(kafka_config_id="t4b",task_id="consume_from_topic_2_b",topics=["test_1"],apply_function_batch=functools.partial(consumer_function_batch,prefix="consumed:::"),commit_cadence="end_of_batch",max_messages=30,max_batch_size=10,)t4.doc_md="Does the same thing as the t2 task, but passes the callable directly""instead of using the string notation."# [START howto_sensor_await_message]t5=AwaitMessageSensor(kafka_config_id="t5",task_id="awaiting_message",topics=["test_1"],apply_function="hello_kafka.await_function",xcom_push_key="retrieved_message",)# [END howto_sensor_await_message]t5.doc_md="A deferable task. Reads the topic `test_1` until a message with a value""divisible by 5 is encountered."t6=PythonOperator(task_id="hello_kafka",python_callable=hello_kafka)t6.doc_md="The task that is executed after the deferable task returns for execution."t1>>t2t3>>[t4,t4b]>>t5>>t6fromtests.system.utilsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)