Source code for airflow.providers.apache.kafka.operators.produce

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
from functools import partial
from typing import Any, Callable, Sequence

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.apache.kafka.hooks.produce import KafkaProducerHook
from airflow.utils.module_loading import import_string

[docs]local_logger = logging.getLogger("airflow")
[docs]def acked(err, msg): if err is not None: local_logger.error("Failed to deliver message: %s", err) else: local_logger.info( "Produced record to topic %s, partition [%s] @ offset %s", msg.topic(), msg.partition(), msg.offset(), )
[docs]class ProduceToTopicOperator(BaseOperator): """An operator that produces messages to a Kafka topic. Registers a producer to a kafka topic and publishes messages to the log. :param kafka_config_id: The connection object to use, defaults to "kafka_default" :param topic: The topic the producer should produce to, defaults to None :param producer_function: The function that generates key/value pairs as messages for production, defaults to None :param producer_function_args: Additional arguments to be applied to the producer callable, defaults to None :param producer_function_kwargs: Additional keyword arguments to be applied to the producer callable, defaults to None :param delivery_callback: The callback to apply after delivery(or failure) of a message, defaults to None :param synchronous: If writes to kafka should be fully synchronous, defaults to True :param poll_timeout: How long of a delay should be applied when calling poll after production to kafka, defaults to 0 :raises AirflowException: _description_ .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:ProduceToTopicOperator` """
[docs] template_fields = ( "topic", "producer_function", "producer_function_args", "producer_function_kwargs", "kafka_config_id", )
def __init__( self, topic: str, producer_function: str | Callable[..., Any], kafka_config_id: str = "kafka_default", producer_function_args: Sequence[Any] | None = None, producer_function_kwargs: dict[Any, Any] | None = None, delivery_callback: str | None = None, synchronous: bool = True, poll_timeout: float = 0, **kwargs: Any, ) -> None: super().__init__(**kwargs) if delivery_callback: dc = import_string(delivery_callback) else: dc = acked self.kafka_config_id = kafka_config_id self.topic = topic self.producer_function = producer_function self.producer_function_args = producer_function_args or () self.producer_function_kwargs = producer_function_kwargs or {} self.delivery_callback = dc self.synchronous = synchronous self.poll_timeout = poll_timeout if not (self.topic and self.producer_function): raise AirflowException( "topic and producer_function must be provided. Got topic=" f"{self.topic} and producer_function={self.producer_function}" ) return
[docs] def execute(self, context) -> None: # Get producer and callable producer = KafkaProducerHook(kafka_config_id=self.kafka_config_id).get_producer() if isinstance(self.producer_function, str): self.producer_function = import_string(self.producer_function) producer_callable = partial( self.producer_function, # type: ignore *self.producer_function_args, **self.producer_function_kwargs, ) # For each returned k/v in the callable : publish and flush if needed. for k, v in producer_callable(): producer.produce(self.topic, key=k, value=v, on_delivery=self.delivery_callback) producer.poll(self.poll_timeout) if self.synchronous: producer.flush() producer.flush()

Was this entry helpful?