package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.slf4j.Logger;

/* loaded from: input_file:lib/kafka-streams-2.6.0.jar:org/apache/kafka/streams/processor/internals/RecordCollectorImpl.class */
public class RecordCollectorImpl implements RecordCollector {
    private static final String SEND_EXCEPTION_MESSAGE = "Error encountered sending record to topic %s for task %s due to:%n%s";
    private final Logger log;
    private final TaskId taskId;
    private final StreamsProducer streamsProducer;
    private final ProductionExceptionHandler productionExceptionHandler;
    private final Sensor droppedRecordsSensor;
    private final boolean eosEnabled;
    private final AtomicReference<KafkaException> sendException = new AtomicReference<>(null);
    private final Map<TopicPartition, Long> offsets = new HashMap();

    public RecordCollectorImpl(LogContext logContext, TaskId taskId, StreamsProducer streamsProducer, ProductionExceptionHandler productionExceptionHandler, StreamsMetricsImpl streamsMetricsImpl) {
        this.log = logContext.logger(getClass());
        this.taskId = taskId;
        this.streamsProducer = streamsProducer;
        this.productionExceptionHandler = productionExceptionHandler;
        this.eosEnabled = streamsProducer.eosEnabled();
        this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), taskId.toString(), streamsMetricsImpl);
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public void initialize() {
        if (this.eosEnabled) {
            this.streamsProducer.initTransaction();
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public <K, V> void send(String str, K k, V v, Headers headers, Long l, Serializer<K> serializer, Serializer<V> serializer2, StreamPartitioner<? super K, ? super V> streamPartitioner) {
        Integer partition;
        if (streamPartitioner != null) {
            try {
                List<PartitionInfo> partitionsFor = this.streamsProducer.partitionsFor(str);
                if (partitionsFor.size() <= 0) {
                    throw new StreamsException("Could not get partition information for topic " + str + " for task " + this.taskId + ". This can happen if the topic does not exist.");
                }
                partition = streamPartitioner.partition(str, k, v, partitionsFor.size());
            } catch (KafkaException e) {
                throw new StreamsException("Could not determine the number of partitions for topic '" + str + "' for task " + this.taskId + " due to " + e.toString());
            }
        } else {
            partition = null;
        }
        send(str, (String) k, (K) v, headers, partition, l, (Serializer<String>) serializer, (Serializer<K>) serializer2);
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public <K, V> void send(String str, K k, V v, Headers headers, Integer num, Long l, Serializer<K> serializer, Serializer<V> serializer2) {
        checkForException();
        try {
            ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(str, num, l, serializer.serialize(str, headers, k), serializer2.serialize(str, headers, v), headers);
            this.streamsProducer.send(producerRecord, (recordMetadata, exc) -> {
                if (this.sendException.get() != null) {
                    return;
                }
                if (exc != null) {
                    recordSendError(str, exc, producerRecord);
                    this.log.trace("Failed record: (key {} value {} timestamp {}) topic=[{}] partition=[{}]", new Object[]{k, v, l, str, num});
                    return;
                }
                TopicPartition topicPartition = new TopicPartition(recordMetadata.topic(), recordMetadata.partition());
                if (recordMetadata.offset() >= 0) {
                    this.offsets.put(topicPartition, Long.valueOf(recordMetadata.offset()));
                } else {
                    this.log.warn("Received offset={} in produce response for {}", Long.valueOf(recordMetadata.offset()), topicPartition);
                }
            });
        } catch (ClassCastException e) {
            throw new StreamsException(String.format("ClassCastException while producing data to topic %s. A serializer (key: %s / value: %s) is not compatible to the actual key or value type (key type: %s / value type: %s). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", str, serializer.getClass().getName(), serializer2.getClass().getName(), k == null ? "unknown because key is null" : k.getClass().getName(), v == null ? "unknown because value is null" : v.getClass().getName()), e);
        } catch (RuntimeException e2) {
            throw new StreamsException(String.format(SEND_EXCEPTION_MESSAGE, str, this.taskId, e2.toString()), e2);
        }
    }

    private void recordSendError(String str, Exception exc, ProducerRecord<byte[], byte[]> producerRecord) {
        String str2;
        String format = String.format(SEND_EXCEPTION_MESSAGE, str, this.taskId, exc.toString());
        if (isFatalException(exc)) {
            str2 = format + "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.";
            this.sendException.set(new StreamsException(str2, exc));
        } else if ((exc instanceof ProducerFencedException) || (exc instanceof OutOfOrderSequenceException)) {
            str2 = format + "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out";
            this.sendException.set(new TaskMigratedException(str2, exc));
        } else {
            if (exc instanceof RetriableException) {
                format = format + "\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, or the connection to broker was interrupted sending the request or receiving the response. \nConsider overwriting `max.block.ms` and /or `delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors";
            }
            if (this.productionExceptionHandler.handle(producerRecord, exc) == ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL) {
                str2 = format + "\nException handler choose to FAIL the processing, no more records would be sent.";
                this.sendException.set(new StreamsException(str2, exc));
            } else {
                str2 = format + "\nException handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded.";
                this.droppedRecordsSensor.record();
            }
        }
        this.log.error(str2);
    }

    private boolean isFatalException(Exception exc) {
        return ((exc instanceof AuthenticationException) || (exc instanceof AuthorizationException) || (exc instanceof SecurityDisabledException)) || ((exc instanceof InvalidTopicException) || (exc instanceof UnknownServerException) || (exc instanceof SerializationException) || (exc instanceof OffsetMetadataTooLarge) || (exc instanceof IllegalStateException));
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public void flush() {
        this.log.debug("Flushing record collector");
        this.streamsProducer.flush();
        checkForException();
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public void closeClean() {
        this.log.info("Closing record collector clean");
        checkForException();
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public void closeDirty() {
        this.log.info("Closing record collector dirty");
        if (this.eosEnabled) {
            this.streamsProducer.abortTransaction();
        }
        checkForException();
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public Map<TopicPartition, Long> offsets() {
        return Collections.unmodifiableMap(new HashMap(this.offsets));
    }

    private void checkForException() {
        KafkaException kafkaException = this.sendException.get();
        if (kafkaException != null) {
            this.sendException.set(null);
            throw kafkaException;
        }
    }

    Producer<byte[], byte[]> producer() {
        return this.streamsProducer.kafkaProducer();
    }
}
