Source code for airflow.providers.apache.kafka.operators.produce
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportloggingfromcollections.abcimportSequencefromfunctoolsimportpartialfromtypingimportAny,Callablefromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.providers.apache.kafka.hooks.produceimportKafkaProducerHookfromairflow.utils.module_loadingimportimport_string
[docs]defacked(err,msg):iferrisnotNone:local_logger.error("Failed to deliver message: %s",err)else:local_logger.info("Produced record to topic %s, partition [%s] @ offset %s",msg.topic(),msg.partition(),msg.offset(),)
[docs]classProduceToTopicOperator(BaseOperator):""" An operator that produces messages to a Kafka topic. Registers a producer to a kafka topic and publishes messages to the log. :param kafka_config_id: The connection object to use, defaults to "kafka_default" :param topic: The topic the producer should produce to, defaults to None :param producer_function: The function that generates key/value pairs as messages for production, defaults to None :param producer_function_args: Additional arguments to be applied to the producer callable, defaults to None :param producer_function_kwargs: Additional keyword arguments to be applied to the producer callable, defaults to None :param delivery_callback: The callback to apply after delivery(or failure) of a message, defaults to None :param synchronous: If writes to kafka should be fully synchronous, defaults to True :param poll_timeout: How long of a delay should be applied when calling poll after production to kafka, defaults to 0 :raises AirflowException: _description_ .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:ProduceToTopicOperator` """
ifnot(self.topicandself.producer_function):raiseAirflowException("topic and producer_function must be provided. Got topic="f"{self.topic} and producer_function={self.producer_function}")return
[docs]defexecute(self,context)->None:# Get producer and callableproducer=KafkaProducerHook(kafka_config_id=self.kafka_config_id).get_producer()ifisinstance(self.producer_function,str):self.producer_function=import_string(self.producer_function)producer_callable=partial(self.producer_function,# type: ignore*self.producer_function_args,**self.producer_function_kwargs,)# For each returned k/v in the callable : publish and flush if needed.fork,vinproducer_callable():producer.produce(self.topic,key=k,value=v,on_delivery=self.delivery_callback)producer.poll(self.poll_timeout)ifself.synchronous:producer.flush()producer.flush()