Source code for airflow.providers.apache.kafka.operators.produce
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportloggingfromfunctoolsimportpartialfromtypingimportAny,Callable,Sequencefromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.providers.apache.kafka.hooks.produceimportKafkaProducerHookfromairflow.utils.module_loadingimportimport_string
[docs]defacked(err,msg):iferrisnotNone:local_logger.error(f"Failed to deliver message: {err}")else:local_logger.info(f"Produced record to topic {msg.topic()} partition [{msg.partition()}] @ offset {msg.offset()}")
[docs]classProduceToTopicOperator(BaseOperator):"""An operator that produces messages to a Kafka topic. Registers a producer to a kafka topic and publishes messages to the log. :param kafka_config_id: The connection object to use, defaults to "kafka_default" :param topic: The topic the producer should produce to, defaults to None :param producer_function: The function that generates key/value pairs as messages for production, defaults to None :param producer_function_args: Additional arguments to be applied to the producer callable, defaults to None :param producer_function_kwargs: Additional keyword arguments to be applied to the producer callable, defaults to None :param delivery_callback: The callback to apply after delivery(or failure) of a message, defaults to None :param synchronous: If writes to kafka should be fully synchronous, defaults to True :param poll_timeout: How long of a delay should be applied when calling poll after production to kafka, defaults to 0 :raises AirflowException: _description_ .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:ProduceToTopicOperator` """
def__init__(self,topic:str,producer_function:str|Callable[...,Any],kafka_config_id:str="kafka_default",producer_function_args:Sequence[Any]|None=None,producer_function_kwargs:dict[Any,Any]|None=None,delivery_callback:str|None=None,synchronous:bool=True,poll_timeout:float=0,**kwargs:Any,)->None:super().__init__(**kwargs)ifdelivery_callback:dc=import_string(delivery_callback)else:dc=ackedself.kafka_config_id=kafka_config_idself.topic=topicself.producer_function=producer_functionself.producer_function_args=producer_function_argsor()self.producer_function_kwargs=producer_function_kwargsor{}self.delivery_callback=dcself.synchronous=synchronousself.poll_timeout=poll_timeoutifnot(self.topicandself.producer_function):raiseAirflowException("topic and producer_function must be provided. Got topic="f"{self.topic} and producer_function={self.producer_function}")return
[docs]defexecute(self,context)->None:# Get producer and callableproducer=KafkaProducerHook(kafka_config_id=self.kafka_config_id).get_producer()ifisinstance(self.producer_function,str):self.producer_function=import_string(self.producer_function)producer_callable=partial(self.producer_function,# type: ignore*self.producer_function_args,**self.producer_function_kwargs,)# For each returned k/v in the callable : publish and flush if needed.fork,vinproducer_callable():producer.produce(self.topic,key=k,value=v,on_delivery=self.delivery_callback)producer.poll(self.poll_timeout)ifself.synchronous:producer.flush()producer.flush()