package io.streamsthoughts.azkarra.commons.error;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.RecordCollector;

/* loaded from: input_file:io/streamsthoughts/azkarra/commons/error/DeadLetterTopicExceptionHandler.class */
public class DeadLetterTopicExceptionHandler implements DeserializationExceptionHandler {
    private static final String OUTPUT_TOPIC_DEFAULT_SUFFIX = "-rejected";
    private Serializer<byte[]> serializer = new ByteArraySerializer();
    private StringSerializer stringSerializer;
    private DeadLetterTopicExceptionHandlerConfig config;
    private List<Header> customHeaders;

    public void configure(Map<String, ?> map) {
        this.config = new DeadLetterTopicExceptionHandlerConfig(map);
        this.customHeaders = (List) this.config.customHeaders().entrySet().stream().map(entry -> {
            return new RecordHeader((String) entry.getKey(), toByteArray(entry.getValue().toString()));
        }).collect(Collectors.toList());
        this.stringSerializer = new StringSerializer();
        this.stringSerializer.configure(map, false);
    }

    public DeserializationExceptionHandler.DeserializationHandlerResponse handle(ProcessorContext processorContext, ConsumerRecord<byte[], byte[]> consumerRecord, Exception exc) {
        RecordCollector recordCollector = ((ProcessorContextImpl) processorContext).recordCollector();
        Headers headers = consumerRecord.headers();
        headers.add(ExceptionHeader.STACKTRACE, toByteArray(getStacktrace(exc)));
        headers.add(ExceptionHeader.MESSAGE, toByteArray(exc.getMessage()));
        headers.add(ExceptionHeader.CLASS_NAME, toByteArray(exc.getClass().getName()));
        headers.add(ExceptionHeader.TIMESTAMP, toByteArray(Long.valueOf(Time.SYSTEM.milliseconds())));
        List<Header> list = this.customHeaders;
        Objects.requireNonNull(headers);
        list.forEach(headers::add);
        recordCollector.send(this.config.outputTopic() != null ? this.config.outputTopic() : consumerRecord.topic() + "-rejected", (byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), headers, Long.valueOf(consumerRecord.timestamp()), this.serializer, this.serializer, (StreamPartitioner) null);
        return getDeserializationHandlerResponse(exc);
    }

    private DeserializationExceptionHandler.DeserializationHandlerResponse getDeserializationHandlerResponse(Exception exc) {
        List<Class<?>> fatalExceptions = this.config.getFatalExceptions();
        DeserializationExceptionHandler.DeserializationHandlerResponse deserializationHandlerResponse = DeserializationExceptionHandler.DeserializationHandlerResponse.CONTINUE;
        Iterator<Class<?>> it = fatalExceptions.iterator();
        while (it.hasNext() && deserializationHandlerResponse.equals(DeserializationExceptionHandler.DeserializationHandlerResponse.CONTINUE)) {
            if (it.next().isAssignableFrom(exc.getClass())) {
                deserializationHandlerResponse = DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL;
            }
        }
        return deserializationHandlerResponse;
    }

    private byte[] toByteArray(Long l) {
        return ByteBuffer.allocate(8).putLong(l.longValue()).array();
    }

    private byte[] toByteArray(String str) {
        return this.stringSerializer.serialize((String) null, str);
    }

    private String getStacktrace(Throwable th) {
        StringWriter stringWriter = new StringWriter();
        th.printStackTrace(new PrintWriter((Writer) stringWriter, true));
        return stringWriter.getBuffer().toString();
    }
}
