package org.frankframework.extensions.kafka;

import jakarta.annotation.Nonnull;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.frankframework.configuration.ConfigurationException;
import org.frankframework.configuration.ConfigurationWarning;
import org.frankframework.core.IPullingListener;
import org.frankframework.core.ListenerException;
import org.frankframework.core.PipeLineResult;
import org.frankframework.core.PipeLineSession;
import org.frankframework.receivers.RawMessageWrapper;
import org.frankframework.stream.Message;
import org.frankframework.stream.MessageContext;
import org.frankframework.util.StringUtil;

@ConfigurationWarning("Experimental and under development. Do not use unless you wish to participate in this development.")
@Deprecated(forRemoval = false)
/* loaded from: input_file:org/frankframework/extensions/kafka/KafkaListener.class */
public class KafkaListener extends KafkaFacade implements IPullingListener<ConsumerRecord<String, byte[]>> {

    @Generated
    private static final Logger log = LogManager.getLogger(KafkaListener.class);
    private static final Predicate<String> TOPIC_NAME_PATTERN = Pattern.compile("^[a-zA-Z0-9._\\-*]*$").asPredicate();
    private String groupId;
    private String topics;
    private Consumer<String, byte[]> consumer;
    private Iterator<? extends ConsumerRecord<String, byte[]>> waiting;
    private Pattern topicPattern;
    private OffsetResetStrategy offsetStrategy = OffsetResetStrategy.EARLIEST;
    private int patternRecheckInterval = 5000;
    private final Duration pollDuration = Duration.ofMillis(1);
    private final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap();
    private final Lock lock = new ReentrantLock();

    @Override // org.frankframework.extensions.kafka.KafkaFacade
    public void configure() throws ConfigurationException {
        super.configure();
        if (StringUtils.isEmpty(this.groupId)) {
            throw new ConfigurationException("groupId must be specified");
        }
        if (StringUtils.isEmpty(this.topics)) {
            throw new ConfigurationException("topics must be specified");
        }
        if (this.patternRecheckInterval < 10) {
            throw new ConfigurationException("patternRecheckInterval should be at least 10");
        }
        this.properties.setProperty("group.id", this.groupId);
        this.properties.setProperty("auto.offset.reset", this.offsetStrategy.name().toLowerCase());
        this.properties.setProperty("metadata.max.age.ms", String.valueOf(this.patternRecheckInterval));
        this.properties.setProperty("enable.auto.commit", "false");
        this.properties.setProperty("max.poll.records", "1");
        List<String> split = StringUtil.split(this.topics);
        for (String str : split) {
            if (!TOPIC_NAME_PATTERN.test(str)) {
                throw new ConfigurationException("topics contains invalid characters. Only a-zA-Z0-9._-* are allowed. (topic: [" + str + "])");
            }
        }
        String join = String.join("|", split);
        if (join.isEmpty()) {
            throw new ConfigurationException("topics must contain at least one valid topic");
        }
        this.topicPattern = Pattern.compile(join);
    }

    public void open() throws ListenerException {
        this.lock.lock();
        try {
            try {
                this.consumer = buildConsumer();
                this.consumer.subscribe(this.topicPattern);
                this.waiting = this.consumer.poll(Duration.ofMillis(100L)).iterator();
                if (this.waiting.hasNext()) {
                    return;
                }
                if (((Double) ((Metric) this.consumer.metrics().values().stream().filter(metric -> {
                    return "response-total".equals(metric.metricName().name());
                }).findFirst().orElseThrow(() -> {
                    return new ListenerException("Failed to get response-total metric.");
                })).metricValue()).intValue() == 0) {
                    throw new ListenerException("Didn't get a response from Kafka while connecting for Listening.");
                }
                this.lock.unlock();
            } catch (RuntimeException e) {
                throw new ListenerException(e);
            }
        } finally {
            this.lock.unlock();
        }
    }

    protected Consumer<String, byte[]> buildConsumer() {
        return new KafkaConsumer(this.properties, new StringDeserializer(), new ByteArrayDeserializer());
    }

    public void close() {
        this.lock.lock();
        try {
            this.consumer.close();
        } finally {
            this.lock.unlock();
        }
    }

    public Message extractMessage(@Nonnull RawMessageWrapper<ConsumerRecord<String, byte[]>> rawMessageWrapper, @Nonnull Map<String, Object> map) {
        HashMap hashMap = new HashMap();
        ConsumerRecord consumerRecord = (ConsumerRecord) rawMessageWrapper.getRawMessage();
        consumerRecord.headers().forEach(header -> {
            try {
                hashMap.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
            } catch (Exception e) {
                log.warn("Failed to convert header key [{}] to string. Bytearray value: [{}]", header.key(), header.value(), e);
            }
        });
        map.put("kafkaTopic", consumerRecord.topic());
        map.put("kafkaKey", consumerRecord.key());
        map.put("kafkaPartition", Integer.valueOf(consumerRecord.partition()));
        map.put("kafkaOffset", Long.valueOf(consumerRecord.offset()));
        map.put("kafkaTimestamp", Long.valueOf(consumerRecord.timestamp()));
        map.put("kafkaHeaders", hashMap);
        return new Message((byte[]) consumerRecord.value(), new MessageContext(map));
    }

    public void afterMessageProcessed(PipeLineResult pipeLineResult, RawMessageWrapper<ConsumerRecord<String, byte[]>> rawMessageWrapper, PipeLineSession pipeLineSession) {
    }

    public String getPhysicalDestinationName() {
        return "TOPICS(" + this.topics + ") on (" + getBootstrapServers() + ")";
    }

    @Nonnull
    public Map<String, Object> openThread() {
        return new HashMap();
    }

    public void closeThread(@Nonnull Map<String, Object> map) {
    }

    public RawMessageWrapper<ConsumerRecord<String, byte[]>> getRawMessage(@Nonnull Map<String, Object> map) {
        this.lock.lock();
        try {
            if (!this.waiting.hasNext()) {
                this.waiting = this.consumer.poll(this.pollDuration).iterator();
            }
            if (!this.waiting.hasNext()) {
                return null;
            }
            ConsumerRecord<String, byte[]> next = this.waiting.next();
            this.offsetAndMetadataMap.put(new TopicPartition(next.topic(), next.partition()), new OffsetAndMetadata(next.offset() + 1));
            this.consumer.commitAsync(this.offsetAndMetadataMap, (map2, exc) -> {
                if (exc != null) {
                    log.error("Failed to commit offsets", exc);
                }
            });
            RawMessageWrapper<ConsumerRecord<String, byte[]>> rawMessageWrapper = new RawMessageWrapper<>(next);
            this.lock.unlock();
            return rawMessageWrapper;
        } finally {
            this.lock.unlock();
        }
    }

    @Generated
    public void setGroupId(String str) {
        this.groupId = str;
    }

    @Generated
    public void setOffsetStrategy(OffsetResetStrategy offsetResetStrategy) {
        this.offsetStrategy = offsetResetStrategy;
    }

    @Generated
    public void setPatternRecheckInterval(int i) {
        this.patternRecheckInterval = i;
    }

    @Generated
    public void setTopics(String str) {
        this.topics = str;
    }

    @Generated
    void setConsumer(Consumer<String, byte[]> consumer) {
        this.consumer = consumer;
    }

    @Generated
    Pattern getTopicPattern() {
        return this.topicPattern;
    }
}
