package io.apicurio.registry.utils.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/apicurio/registry/utils/kafka/ConsumerContainer.class */
public class ConsumerContainer<K, V> implements ConsumerActions<K, V> {
    private static final Logger log;
    public static final long DEFAULT_CONSUMER_POLL_TIMEOUT;
    private static final long MIN_RETRY_DELAY = 100;
    private static final long MAX_RETRY_DELAY = 10000;
    private static final AtomicInteger containerCount;
    private final Object lock;
    private final Properties consumerProperties;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private final Duration consumerPollTimeout;
    private final Consumer<? super ConsumerRecord<K, V>> recordConsumer;
    private final Consumer<? super ConsumerRecords<K, V>> recordsConsumer;
    private final long idlePingTimeout;
    private final Consumer<? super TopicPartition> idlePingConsumer;
    private final Thread thread;
    private final BlockingQueue<CompletableFuture<org.apache.kafka.clients.consumer.Consumer<K, V>>> tasks;
    private volatile boolean running;
    private volatile boolean stopping;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:io/apicurio/registry/utils/kafka/ConsumerContainer$DynamicPool.class */
    public static class DynamicPool<K, V> {
        private final Properties consumerProperties;
        private final Deserializer<K> keyDeserializer;
        private final Deserializer<V> valueDeserializer;
        private final String topic;
        private final Oneof2<Consumer<? super ConsumerRecord<K, V>>, Consumer<? super ConsumerRecords<K, V>>> recordOrRecordsConsumer;
        private final LinkedList<ConsumerContainer<K, V>> containers = new LinkedList<>();
        private volatile boolean running;
        private volatile boolean stopped;

        public DynamicPool(Properties properties, Deserializer<K> deserializer, Deserializer<V> deserializer2, String str, int i, Oneof2<Consumer<? super ConsumerRecord<K, V>>, Consumer<? super ConsumerRecords<K, V>>> oneof2) {
            this.consumerProperties = (Properties) Objects.requireNonNull(properties);
            this.keyDeserializer = (Deserializer) Objects.requireNonNull(deserializer);
            this.valueDeserializer = (Deserializer) Objects.requireNonNull(deserializer2);
            this.topic = (String) Objects.requireNonNull(str);
            this.recordOrRecordsConsumer = (Oneof2) Objects.requireNonNull(oneof2);
            setConsumerThreads(i);
        }

        public synchronized int getConsumerThreads() {
            return this.containers.size();
        }

        public synchronized void setConsumerThreads(int i) {
            checkNotStopped();
            if (i < 0) {
                throw new IllegalArgumentException("consumerThreads should be non-negative");
            }
            while (this.containers.size() > i) {
                ConsumerContainer<K, V> removeLast = this.containers.removeLast();
                if (this.running) {
                    removeLast.stop();
                }
            }
            while (this.containers.size() < i) {
                ConsumerContainer<K, V> consumerContainer = new ConsumerContainer<>(this.consumerProperties, this.keyDeserializer, this.valueDeserializer, this.recordOrRecordsConsumer);
                consumerContainer.submit(consumer -> {
                    consumer.subscribe(Collections.singletonList(this.topic));
                    return null;
                });
                if (this.running) {
                    consumerContainer.start();
                }
                this.containers.addLast(consumerContainer);
            }
        }

        private void checkNotStopped() {
            if (this.stopped) {
                throw new IllegalArgumentException("Already stopped");
            }
        }

        public synchronized void start() {
            checkNotStopped();
            if (this.running) {
                return;
            }
            this.containers.forEach((v0) -> {
                v0.start();
            });
            this.running = true;
        }

        public synchronized void stop() {
            if (this.stopped) {
                return;
            }
            if (this.running) {
                this.containers.forEach((v0) -> {
                    v0.stop();
                });
                this.running = false;
            }
            this.stopped = true;
        }

        public boolean isRunning() {
            return this.running;
        }
    }

    public ConsumerContainer(Properties properties, Deserializer<K> deserializer, Deserializer<V> deserializer2, Oneof2<Consumer<? super ConsumerRecord<K, V>>, Consumer<? super ConsumerRecords<K, V>>> oneof2) {
        this(properties, deserializer, deserializer2, DEFAULT_CONSUMER_POLL_TIMEOUT, oneof2, 0L, null);
    }

    public ConsumerContainer(Properties properties, Deserializer<K> deserializer, Deserializer<V> deserializer2, long j, Oneof2<Consumer<? super ConsumerRecord<K, V>>, Consumer<? super ConsumerRecords<K, V>>> oneof2, long j2, Consumer<? super TopicPartition> consumer) {
        this.lock = new Object();
        this.tasks = new LinkedTransferQueue();
        this.consumerProperties = (Properties) Objects.requireNonNull(properties);
        this.keyDeserializer = (Deserializer) Objects.requireNonNull(deserializer);
        this.valueDeserializer = (Deserializer) Objects.requireNonNull(deserializer2);
        this.consumerPollTimeout = Duration.ofMillis(j);
        this.recordConsumer = oneof2.isFirst() ? oneof2.getFirst() : null;
        this.recordsConsumer = oneof2.isSecond() ? oneof2.getSecond() : null;
        this.idlePingTimeout = j2;
        this.idlePingConsumer = consumer;
        this.thread = new Thread(this::consumerLoop, "kafka-consumer-container-" + containerCount.incrementAndGet());
    }

    @Override // io.apicurio.registry.utils.kafka.ConsumerActions
    public final <R> CompletableFuture<R> submit(Function<? super org.apache.kafka.clients.consumer.Consumer<K, V>, ? extends R> function) {
        CompletableFuture<R> thenApply;
        Objects.requireNonNull(function);
        if (Thread.currentThread() == this.thread) {
            throw new IllegalStateException("Don't submit actions from consumer thread");
        }
        synchronized (this.lock) {
            CompletableFuture<org.apache.kafka.clients.consumer.Consumer<K, V>> completableFuture = new CompletableFuture<>();
            thenApply = completableFuture.thenApply((Function<? super org.apache.kafka.clients.consumer.Consumer<K, V>, ? extends U>) function);
            if (this.stopping) {
                completableFuture.completeExceptionally(new IllegalStateException("Already stopping or stopped"));
            } else {
                this.tasks.add(completableFuture);
            }
        }
        return thenApply;
    }

    /* JADX WARN: Finally extract failed */
    private void consumerLoop() {
        boolean z = false;
        Map hashMap = this.idlePingConsumer == null ? null : new HashMap();
        try {
            try {
                org.apache.kafka.clients.consumer.Consumer<K, V> kafkaConsumer = new KafkaConsumer<>(this.consumerProperties, this.keyDeserializer, this.valueDeserializer);
                Throwable th = null;
                while (!this.stopping) {
                    try {
                        CompletableFuture<org.apache.kafka.clients.consumer.Consumer<K, V>> take = z ? this.tasks.take() : this.tasks.poll();
                        if (take != null) {
                            take.complete(kafkaConsumer);
                            if (z) {
                                z = kafkaConsumer.subscription().isEmpty() && kafkaConsumer.assignment().isEmpty();
                            }
                        } else {
                            if (!$assertionsDisabled && z) {
                                throw new AssertionError();
                            }
                            ConsumerRecords<K, V> consumerRecords = null;
                            try {
                                consumerRecords = kafkaConsumer.poll(this.consumerPollTimeout);
                            } catch (IllegalStateException e) {
                                log.info("{} - will wait", e.getMessage());
                                z = true;
                            }
                            Long valueOf = Long.valueOf(System.currentTimeMillis());
                            if (consumerRecords != null) {
                                if (hashMap != null) {
                                    Iterator it = consumerRecords.partitions().iterator();
                                    while (it.hasNext()) {
                                        hashMap.put((TopicPartition) it.next(), valueOf);
                                    }
                                }
                                consume(consumerRecords, kafkaConsumer);
                            }
                            if (hashMap != null) {
                                ArrayList arrayList = null;
                                for (Map.Entry<K, V> entry : hashMap.entrySet()) {
                                    if (valueOf.longValue() - ((Long) entry.getValue()).longValue() >= this.idlePingTimeout) {
                                        if (arrayList == null) {
                                            arrayList = new ArrayList(4);
                                        }
                                        arrayList.add(entry.getKey());
                                        entry.setValue(valueOf);
                                    }
                                }
                                if (arrayList != null) {
                                    arrayList.forEach(this.idlePingConsumer);
                                    hashMap.keySet().retainAll(kafkaConsumer.assignment());
                                }
                            }
                        }
                    } catch (Throwable th2) {
                        if (kafkaConsumer != null) {
                            if (0 != 0) {
                                try {
                                    kafkaConsumer.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                kafkaConsumer.close();
                            }
                        }
                        throw th2;
                    }
                }
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                log.info("Consumer thread exiting");
                this.running = false;
            } catch (Throwable th5) {
                log.info("Consumer thread exiting");
                this.running = false;
                throw th5;
            }
        } catch (Throwable th6) {
            log.warn("Exception caught in consumer polling thread", th6);
            log.info("Consumer thread exiting");
            this.running = false;
        }
    }

    private void consume(ConsumerRecords<K, V> consumerRecords, org.apache.kafka.clients.consumer.Consumer<K, V> consumer) {
        if (this.recordsConsumer != null) {
            consumeRetryable(consumerRecords, this.recordsConsumer, consumer);
        } else {
            if (!$assertionsDisabled && this.recordConsumer == null) {
                throw new AssertionError();
            }
            Iterator it = consumerRecords.iterator();
            while (it.hasNext()) {
                consumeRetryable((ConsumerRecord) it.next(), this.recordConsumer, consumer);
            }
        }
    }

    private <T> void consumeRetryable(T t, Consumer<? super T> consumer, org.apache.kafka.clients.consumer.Consumer<K, V> consumer2) {
        CompletableFuture<org.apache.kafka.clients.consumer.Consumer<K, V>> poll;
        long j = 100;
        boolean z = false;
        while (!this.stopping) {
            try {
                consumer.accept(t);
                if (z) {
                    Thread.currentThread().interrupt();
                    return;
                }
                return;
            } catch (Exception e) {
                try {
                    log.warn("Exception caught while processing {} - retrying in {} ms", new Object[]{formatRecord(t), Long.valueOf(j), e});
                    while (!this.stopping && (poll = this.tasks.poll(j, TimeUnit.MILLISECONDS)) != null) {
                        try {
                            poll.complete(consumer2);
                        } catch (InterruptedException e2) {
                            log.info("Interrupted - keeping retrying");
                            z = true;
                        }
                    }
                    j = Math.min(j * 2, MAX_RETRY_DELAY);
                    if (z) {
                        Thread.currentThread().interrupt();
                    }
                } catch (Throwable th) {
                    if (z) {
                        Thread.currentThread().interrupt();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                try {
                    log.error("Error caught while processing {} - exiting JVM", formatRecord(t), th2);
                    System.exit(-1);
                    if (z) {
                        Thread.currentThread().interrupt();
                    }
                } catch (Throwable th3) {
                    System.exit(-1);
                    throw th3;
                }
            }
        }
    }

    private static String formatRecord(Object obj) {
        if (obj instanceof ConsumerRecord) {
            ConsumerRecord consumerRecord = (ConsumerRecord) obj;
            return String.format("message from topic-partition %s-%s, offset %d, timestamp %tc", consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()));
        }
        if (!(obj instanceof ConsumerRecords)) {
            return String.valueOf(obj);
        }
        ConsumerRecords consumerRecords = (ConsumerRecords) obj;
        return String.format("%d messages from topic-partitions %s", Integer.valueOf(consumerRecords.count()), consumerRecords.partitions());
    }

    @Override // io.apicurio.registry.utils.kafka.ConsumerActions
    public void start() {
        synchronized (this.lock) {
            if (this.stopping) {
                throw new IllegalArgumentException("Already stopping or stopped");
            }
            if (this.running) {
                return;
            }
            this.running = true;
            this.thread.start();
        }
    }

    @Override // io.apicurio.registry.utils.kafka.ConsumerActions
    public void stop() {
        synchronized (this.lock) {
            doStop().join();
        }
    }

    @Override // io.apicurio.registry.utils.kafka.ConsumerActions
    public boolean isRunning() {
        return this.running;
    }

    private CompletableFuture<Void> doStop() {
        if (this.running) {
            return submit(consumer -> {
                if (this.stopping) {
                    return null;
                }
                this.stopping = true;
                return null;
            });
        }
        throw new IllegalStateException("Not started yet or already stopped");
    }

    static {
        $assertionsDisabled = !ConsumerContainer.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(ConsumerContainer.class);
        DEFAULT_CONSUMER_POLL_TIMEOUT = TimeUnit.SECONDS.toMillis(1L);
        containerCount = new AtomicInteger();
    }
}
