/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.kafka.ingest;

import gnu.trove.map.hash.TIntObjectHashMap;
import io.deephaven.base.verify.Require;
import io.deephaven.configuration.Configuration;
import io.deephaven.hash.KeyedIntObjectHashMap;
import io.deephaven.hash.KeyedIntObjectKey;
import io.deephaven.io.logger.Logger;
import io.deephaven.kafka.ingest.KafkaStreamConsumer;
import java.io.Serializable;
import java.text.DecimalFormat;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.function.IntFunction;
import java.util.function.IntPredicate;
import java.util.function.IntToLongFunction;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.jetbrains.annotations.NotNull;

public class KafkaIngester {
    private static final int REPORT_INTERVAL_MS = Configuration.getInstance().getIntegerForClassWithDefault(KafkaIngester.class, "reportIntervalMs", 60000);
    private static final long MAX_ERRS = Configuration.getInstance().getLongForClassWithDefault(KafkaIngester.class, "maxErrs", 500L);
    private final KafkaConsumer<?, ?> kafkaConsumer;
    @NotNull
    private final Logger log;
    private final String topic;
    private final String partitionDescription;
    private final TIntObjectHashMap<KafkaStreamConsumer> streamConsumers = new TIntObjectHashMap();
    private final KeyedIntObjectHashMap<TopicPartition> assignedPartitions = new KeyedIntObjectHashMap((KeyedIntObjectKey)new KeyedIntObjectKey.BasicStrict<TopicPartition>(){

        public int getIntKey(@NotNull TopicPartition topicPartition) {
            return topicPartition.partition();
        }
    });
    private final String logPrefix;
    private long messagesProcessed = 0L;
    private long bytesProcessed = 0L;
    private long pollCalls = 0L;
    private long messagesWithErr = 0L;
    private long lastMessages = 0L;
    private long lastBytes = 0L;
    private long lastPollCalls = 0L;
    private volatile boolean needsAssignment;
    private volatile boolean done;
    public static final IntPredicate ALL_PARTITIONS = new IntPredicate(){

        @Override
        public boolean test(int value) {
            return true;
        }

        public String toString() {
            return "ALL";
        }
    };
    public static long SEEK_TO_BEGINNING = -1L;
    public static long DONT_SEEK = -2L;
    public static long SEEK_TO_END = -3L;
    public static IntToLongFunction ALL_PARTITIONS_SEEK_TO_BEGINNING = p -> SEEK_TO_BEGINNING;
    public static IntToLongFunction ALL_PARTITIONS_DONT_SEEK = p -> DONT_SEEK;
    public static IntToLongFunction ALL_PARTITIONS_SEEK_TO_END = p -> SEEK_TO_END;

    public KafkaIngester(Logger log, Properties props, String topic, IntFunction<KafkaStreamConsumer> partitionToStreamConsumer, IntToLongFunction partitionToInitialSeekOffset) {
        this(log, props, topic, ALL_PARTITIONS, partitionToStreamConsumer, partitionToInitialSeekOffset);
    }

    public KafkaIngester(@NotNull Logger log, Properties props, String topic, IntPredicate partitionFilter, IntFunction<KafkaStreamConsumer> partitionToStreamConsumer, IntToLongFunction partitionToInitialSeekOffset) {
        this.log = log;
        this.topic = topic;
        this.partitionDescription = partitionFilter.toString();
        this.logPrefix = KafkaIngester.class.getSimpleName() + "(" + topic + ", " + this.partitionDescription + "): ";
        this.kafkaConsumer = new KafkaConsumer(props);
        this.kafkaConsumer.partitionsFor(topic).stream().filter(pi -> partitionFilter.test(pi.partition())).map(pi -> new TopicPartition(topic, pi.partition())).forEach(tp -> {
            this.assignedPartitions.add(tp);
            this.streamConsumers.put(tp.partition(), (Object)((KafkaStreamConsumer)partitionToStreamConsumer.apply(tp.partition())));
        });
        this.assign();
        for (TopicPartition topicPartition : this.assignedPartitions) {
            long seekOffset = partitionToInitialSeekOffset.applyAsLong(topicPartition.partition());
            if (seekOffset == SEEK_TO_BEGINNING) {
                log.info().append((CharSequence)this.logPrefix).append((CharSequence)topicPartition.toString()).append((CharSequence)" seeking to beginning.").append(seekOffset).endl();
                this.kafkaConsumer.seekToBeginning(Collections.singletonList(topicPartition));
                continue;
            }
            if (seekOffset == SEEK_TO_END) {
                log.info().append((CharSequence)this.logPrefix).append((CharSequence)topicPartition.toString()).append((CharSequence)" seeking to end.").append(seekOffset).endl();
                this.kafkaConsumer.seekToEnd(Collections.singletonList(topicPartition));
                continue;
            }
            if (seekOffset == DONT_SEEK) continue;
            log.info().append((CharSequence)this.logPrefix).append((CharSequence)topicPartition.toString()).append((CharSequence)" seeking to offset ").append(seekOffset).append((CharSequence)".").endl();
            this.kafkaConsumer.seek(topicPartition, seekOffset);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void assign() {
        KeyedIntObjectHashMap<TopicPartition> keyedIntObjectHashMap = this.assignedPartitions;
        synchronized (keyedIntObjectHashMap) {
            this.kafkaConsumer.assign(this.assignedPartitions.values());
            this.log.info().append((CharSequence)this.logPrefix).append((CharSequence)"Partition Assignments: ").append((CharSequence)this.assignedPartitions.values().toString()).endl();
        }
    }

    public String toString() {
        return KafkaIngester.class.getSimpleName() + this.topic + ":" + this.partitionDescription;
    }

    public void start() {
        Thread t = new Thread(this::consumerLoop, this.toString());
        t.setDaemon(true);
        t.start();
    }

    private static double unitsPerSec(long units, long nanos) {
        if (nanos <= 0L) {
            return 0.0;
        }
        return 1.0E9 * (double)units / (double)nanos;
    }

    private void consumerLoop() {
        long reportIntervalNanos = (long)REPORT_INTERVAL_MS * 1000000L;
        long lastReportNanos = System.nanoTime();
        long nextReport = lastReportNanos + reportIntervalNanos;
        DecimalFormat rateFormat = new DecimalFormat("#.###");
        while (!this.done) {
            while (this.needsAssignment) {
                this.needsAssignment = false;
                this.assign();
            }
            long beforePoll = System.nanoTime();
            long remainingNanos = beforePoll > nextReport ? 0L : nextReport - beforePoll;
            boolean more = this.pollOnce(Duration.ofNanos(remainingNanos));
            if (!more) {
                this.log.error().append((CharSequence)this.logPrefix).append((CharSequence)"Stopping due to errors (").append(this.messagesWithErr).append((CharSequence)" messages with error out of ").append(this.messagesProcessed).append((CharSequence)" messages processed)").endl();
                break;
            }
            long afterPoll = System.nanoTime();
            if (afterPoll <= nextReport) continue;
            long periodMessages = this.messagesProcessed - this.lastMessages;
            long periodBytes = this.bytesProcessed - this.lastBytes;
            long periodPolls = this.pollCalls - this.lastPollCalls;
            long periodNanos = afterPoll - lastReportNanos;
            this.log.info().append((CharSequence)this.logPrefix).append((CharSequence)"ingestion period summary").append((CharSequence)": polls=").append(periodPolls).append((CharSequence)", messages=").append(periodMessages).append((CharSequence)", bytes=").append(periodBytes).append((CharSequence)", time=").append(periodNanos / 1000000L).append((CharSequence)"ms").append((CharSequence)", polls/sec=").append((CharSequence)rateFormat.format(KafkaIngester.unitsPerSec(periodPolls, periodNanos))).append((CharSequence)", msgs/sec=").append((CharSequence)rateFormat.format(KafkaIngester.unitsPerSec(periodMessages, periodNanos))).append((CharSequence)", bytes/sec=").append((CharSequence)rateFormat.format(KafkaIngester.unitsPerSec(periodBytes, periodNanos))).endl();
            lastReportNanos = afterPoll;
            nextReport = lastReportNanos + reportIntervalNanos;
            this.lastMessages = this.messagesProcessed;
            this.lastBytes = this.bytesProcessed;
            this.lastPollCalls = this.pollCalls;
        }
        this.log.info().append((CharSequence)this.logPrefix).append((CharSequence)"Closing Kafka consumer").endl();
        this.kafkaConsumer.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean pollOnce(Duration timeout) {
        ConsumerRecords records;
        try {
            ++this.pollCalls;
            records = this.kafkaConsumer.poll(timeout);
        }
        catch (WakeupException we) {
            return true;
        }
        catch (Exception ex) {
            this.log.error().append((CharSequence)this.logPrefix).append((CharSequence)"Exception while polling for Kafka messages:").append((Throwable)ex).append((CharSequence)", aborting.").endl();
            return false;
        }
        for (TopicPartition topicPartition : records.partitions()) {
            List partitionRecords;
            KafkaStreamConsumer streamConsumer;
            int partition = topicPartition.partition();
            TIntObjectHashMap<KafkaStreamConsumer> tIntObjectHashMap = this.streamConsumers;
            synchronized (tIntObjectHashMap) {
                streamConsumer = (KafkaStreamConsumer)this.streamConsumers.get(partition);
            }
            if (streamConsumer == null || (partitionRecords = records.records(topicPartition)).isEmpty()) continue;
            try {
                this.bytesProcessed += streamConsumer.consume(partitionRecords);
            }
            catch (Throwable ex) {
                ++this.messagesWithErr;
                this.log.error().append((CharSequence)this.logPrefix).append((CharSequence)"Exception while processing Kafka message:").append(ex).endl();
                if (this.messagesWithErr <= MAX_ERRS) continue;
                streamConsumer.acceptFailure(ex);
                this.log.error().append((CharSequence)this.logPrefix).append((CharSequence)("Max number of errors exceeded, aborting " + this + " consumer thread.")).endl();
                return false;
            }
            this.messagesProcessed += (long)partitionRecords.size();
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        if (this.done) {
            return;
        }
        TIntObjectHashMap<KafkaStreamConsumer> tIntObjectHashMap = this.streamConsumers;
        synchronized (tIntObjectHashMap) {
            this.streamConsumers.clear();
        }
        this.done = true;
        this.kafkaConsumer.wakeup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdownPartition(int partition) {
        boolean becameEmpty;
        if (this.done) {
            return;
        }
        TIntObjectHashMap<KafkaStreamConsumer> tIntObjectHashMap = this.streamConsumers;
        synchronized (tIntObjectHashMap) {
            if (this.streamConsumers.remove(partition) == null) {
                return;
            }
            becameEmpty = this.streamConsumers.isEmpty();
        }
        this.assignedPartitions.remove(partition);
        if (becameEmpty) {
            this.done = true;
        } else {
            this.needsAssignment = true;
        }
        this.kafkaConsumer.wakeup();
    }

    public static class PartitionRoundRobin
    implements IntPredicate {
        final int consumerIndex;
        final int consumerCount;

        public PartitionRoundRobin(int consumerIndex, int consumerCount) {
            this.consumerIndex = Require.geqZero((int)Require.lt((int)consumerIndex, (String)"consumerIndex", (int)consumerCount, (String)"consumerCount"), (String)"consumerIndex");
            this.consumerCount = consumerCount;
        }

        @Override
        public boolean test(int value) {
            return value % this.consumerCount == this.consumerIndex;
        }

        public String toString() {
            return "N % " + this.consumerCount + " == " + this.consumerIndex;
        }
    }

    public static class SinglePartition
    extends PartitionRange {
        public SinglePartition(int partition) {
            super(partition, partition);
        }
    }

    public static class PartitionRange
    implements IntPredicate {
        final int startInclusive;
        final int endInclusive;

        public PartitionRange(int startInclusive, int endInclusive) {
            this.startInclusive = startInclusive;
            this.endInclusive = Require.geq((int)endInclusive, (String)"endInclusive", (int)startInclusive, (String)"startInclusive");
        }

        @Override
        public boolean test(int value) {
            return value >= this.startInclusive && value <= this.endInclusive;
        }

        public String toString() {
            return Integer.toString(this.startInclusive) + (Serializable)(this.startInclusive == this.endInclusive ? "" : Integer.valueOf(this.endInclusive));
        }
    }
}

