Source code for airflow.providers.apache.kafka.triggers.await_message
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasynciofromfunctoolsimportpartialfromtypingimportAny,Sequencefromasgiref.syncimportsync_to_asyncfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.apache.kafka.hooks.consumeimportKafkaConsumerHookfromairflow.triggers.baseimportBaseTrigger,TriggerEventfromairflow.utils.module_loadingimportimport_string
[docs]classAwaitMessageTrigger(BaseTrigger):"""A trigger that waits for a message matching specific criteria to arrive in Kafka. The behavior of the consumer of this trigger is as follows: - poll the Kafka topics for a message, if no message returned, sleep - process the message with provided callable and commit the message offset: - if callable returns any data, raise a TriggerEvent with the return data - else continue to next message :param kafka_config_id: The connection object to use, defaults to "kafka_default" :param topics: The topic (or topic regex) that should be searched for messages :param apply_function: the location of the function to apply to messages for determination of matching criteria. (In python dot notation as a string) :param apply_function_args: A set of arguments to apply to the callable, defaults to None :param apply_function_kwargs: A set of key word arguments to apply to the callable, defaults to None, defaults to None :param poll_timeout: How long the Kafka client should wait before returning from a poll request to Kafka (seconds), defaults to 1 :param poll_interval: How long the trigger should sleep after reaching the end of the Kafka log (seconds), defaults to 5 """def__init__(self,topics:Sequence[str],apply_function:str,kafka_config_id:str="kafka_default",apply_function_args:Sequence[Any]|None=None,apply_function_kwargs:dict[Any,Any]|None=None,poll_timeout:float=1,poll_interval:float=5,)->None:self.topics=topicsself.apply_function=apply_functionself.apply_function_args=apply_function_argsor()self.apply_function_kwargs=apply_function_kwargsor{}self.kafka_config_id=kafka_config_idself.poll_timeout=poll_timeoutself.poll_interval=poll_interval