package io.apicurio.registry.utils.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/apicurio/registry/utils/kafka/ConsumerContainer.class */
public class ConsumerContainer<K, V> implements ConsumerActions<K, V> {
    private static final Logger log;
    public static final long DEFAULT_CONSUMER_POLL_TIMEOUT;
    private static final long MIN_RETRY_DELAY = 100;
    private static final long MAX_RETRY_DELAY = 10000;
    private static final AtomicInteger containerCount;
    private final Object lock;
    private final Properties consumerProperties;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private final Duration consumerPollTimeout;
    private final Consumer<? super ConsumerRecord<K, V>> recordHandler;
    private final Consumer<? super ConsumerRecords<K, V>> recordsHandler;
    private final BiConsumer<? super org.apache.kafka.clients.consumer.Consumer<?, ?>, ? super RuntimeException> consumerExceptionHandler;
    private final long idlePingTimeout;
    private final Consumer<? super TopicPartition> idlePingHandler;
    private final Thread thread;
    private final BlockingQueue<CompletableFuture<org.apache.kafka.clients.consumer.Consumer<K, V>>> tasks;
    private volatile boolean closed;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:io/apicurio/registry/utils/kafka/ConsumerContainer$DynamicPool.class */
    public static class DynamicPool<K, V> implements AutoCloseable {
        private final Properties consumerProperties;
        private final Deserializer<K> keyDeserializer;
        private final Deserializer<V> valueDeserializer;
        private final String topic;
        private final Oneof2<Consumer<? super ConsumerRecord<K, V>>, Consumer<? super ConsumerRecords<K, V>>> recordOrRecordsHandler;
        private final BiConsumer<? super org.apache.kafka.clients.consumer.Consumer<?, ?>, ? super RuntimeException> consumerExceptionHandler;
        private final LinkedList<ConsumerContainer<K, V>> containers = new LinkedList<>();
        private volatile boolean closed;

        public DynamicPool(Properties properties, Deserializer<K> deserializer, Deserializer<V> deserializer2, String str, int i, Oneof2<Consumer<? super ConsumerRecord<K, V>>, Consumer<? super ConsumerRecords<K, V>>> oneof2, BiConsumer<? super org.apache.kafka.clients.consumer.Consumer<?, ?>, ? super RuntimeException> biConsumer) {
            this.consumerProperties = (Properties) Objects.requireNonNull(properties);
            this.keyDeserializer = (Deserializer) Objects.requireNonNull(deserializer);
            this.valueDeserializer = (Deserializer) Objects.requireNonNull(deserializer2);
            this.topic = (String) Objects.requireNonNull(str);
            this.recordOrRecordsHandler = (Oneof2) Objects.requireNonNull(oneof2);
            this.consumerExceptionHandler = (BiConsumer) Objects.requireNonNull(biConsumer);
            setConsumerThreads(i);
        }

        public synchronized int getConsumerThreads() {
            return this.containers.size();
        }

        public synchronized void setConsumerThreads(int i) {
            checkNotClosed();
            if (i < 0) {
                throw new IllegalArgumentException("consumerThreads should be non-negative");
            }
            while (this.containers.size() > i) {
                this.containers.removeLast().close();
            }
            while (this.containers.size() < i) {
                ConsumerContainer<K, V> consumerContainer = new ConsumerContainer<>(this.consumerProperties, this.keyDeserializer, this.valueDeserializer, this.recordOrRecordsHandler, this.consumerExceptionHandler);
                consumerContainer.submit(consumer -> {
                    consumer.subscribe(Collections.singletonList(this.topic));
                    return null;
                });
                this.containers.addLast(consumerContainer);
            }
        }

        private void checkNotClosed() {
            if (this.closed) {
                throw new IllegalStateException("Container already closed");
            }
        }

        @Override // java.lang.AutoCloseable
        public synchronized void close() {
            if (this.closed) {
                return;
            }
            try {
                this.containers.forEach((v0) -> {
                    v0.close();
                });
            } finally {
                this.closed = true;
            }
        }
    }

    public ConsumerContainer(Properties properties, Deserializer<K> deserializer, Deserializer<V> deserializer2, Oneof2<Consumer<? super ConsumerRecord<K, V>>, Consumer<? super ConsumerRecords<K, V>>> oneof2, BiConsumer<? super org.apache.kafka.clients.consumer.Consumer<?, ?>, ? super RuntimeException> biConsumer) {
        this(properties, deserializer, deserializer2, DEFAULT_CONSUMER_POLL_TIMEOUT, oneof2, biConsumer, 0L, null);
    }

    public ConsumerContainer(Properties properties, Deserializer<K> deserializer, Deserializer<V> deserializer2, long j, Oneof2<Consumer<? super ConsumerRecord<K, V>>, Consumer<? super ConsumerRecords<K, V>>> oneof2, BiConsumer<? super org.apache.kafka.clients.consumer.Consumer<?, ?>, ? super RuntimeException> biConsumer, long j2, Consumer<? super TopicPartition> consumer) {
        this.lock = new Object();
        this.tasks = new LinkedTransferQueue();
        this.consumerProperties = (Properties) Objects.requireNonNull(properties);
        this.keyDeserializer = (Deserializer) Objects.requireNonNull(deserializer);
        this.valueDeserializer = (Deserializer) Objects.requireNonNull(deserializer2);
        this.consumerPollTimeout = Duration.ofMillis(j);
        this.recordHandler = oneof2.isFirst() ? oneof2.getFirst() : null;
        this.recordsHandler = oneof2.isSecond() ? oneof2.getSecond() : null;
        this.consumerExceptionHandler = (BiConsumer) Objects.requireNonNull(biConsumer);
        this.idlePingTimeout = j2;
        this.idlePingHandler = consumer;
        this.thread = new Thread(this::consumerLoop, "kafka-consumer-container-" + containerCount.incrementAndGet());
        this.thread.start();
    }

    @Override // io.apicurio.registry.utils.kafka.ConsumerActions
    public final <R> CompletableFuture<R> submit(Function<? super org.apache.kafka.clients.consumer.Consumer<K, V>, ? extends R> function) {
        CompletableFuture<R> thenApply;
        Objects.requireNonNull(function);
        if (Thread.currentThread() == this.thread) {
            throw new IllegalStateException("Don't submit actions from consumer thread");
        }
        synchronized (this.lock) {
            CompletableFuture<org.apache.kafka.clients.consumer.Consumer<K, V>> completableFuture = new CompletableFuture<>();
            thenApply = completableFuture.thenApply((Function<? super org.apache.kafka.clients.consumer.Consumer<K, V>, ? extends U>) function);
            if (this.closed) {
                completableFuture.completeExceptionally(new IllegalStateException("Container already closed"));
            } else {
                this.tasks.add(completableFuture);
            }
        }
        return thenApply;
    }

    private void consumerLoop() {
        CompletableFuture<org.apache.kafka.clients.consumer.Consumer<K, V>> completableFuture;
        CompletableFuture<org.apache.kafka.clients.consumer.Consumer<K, V>> take;
        boolean z = false;
        Map hashMap = this.idlePingHandler == null ? null : new HashMap();
        try {
            try {
                org.apache.kafka.clients.consumer.Consumer<K, V> kafkaConsumer = new KafkaConsumer<>(this.consumerProperties, this.keyDeserializer, this.valueDeserializer);
                Throwable th = null;
                while (!this.closed) {
                    try {
                        boolean z2 = false;
                        if (z) {
                            try {
                                take = this.tasks.take();
                            } catch (InterruptedException e) {
                                log.warn("Consumer thread interrupted", e);
                                completableFuture = null;
                                z2 = true;
                            }
                        } else {
                            take = this.tasks.poll();
                        }
                        completableFuture = take;
                        if (completableFuture != null) {
                            completableFuture.complete(kafkaConsumer);
                            if (z) {
                                z = kafkaConsumer.subscription().isEmpty() && kafkaConsumer.assignment().isEmpty();
                            }
                        } else if (z2) {
                            continue;
                        } else {
                            if (!$assertionsDisabled && z) {
                                throw new AssertionError();
                            }
                            ConsumerRecords<K, V> consumerRecords = null;
                            try {
                                consumerRecords = kafkaConsumer.poll(this.consumerPollTimeout);
                            } catch (IllegalStateException e2) {
                                log.info("{} - will wait", e2.getMessage());
                                z = true;
                            } catch (RuntimeException e3) {
                                this.consumerExceptionHandler.accept(kafkaConsumer, e3);
                            }
                            Long valueOf = Long.valueOf(System.currentTimeMillis());
                            if (consumerRecords != null) {
                                if (hashMap != null) {
                                    Iterator it = consumerRecords.partitions().iterator();
                                    while (it.hasNext()) {
                                        hashMap.put((TopicPartition) it.next(), valueOf);
                                    }
                                }
                                handleRecords(consumerRecords, kafkaConsumer);
                            }
                            if (hashMap != null) {
                                ArrayList arrayList = null;
                                for (Map.Entry<K, V> entry : hashMap.entrySet()) {
                                    if (valueOf.longValue() - ((Long) entry.getValue()).longValue() >= this.idlePingTimeout) {
                                        if (arrayList == null) {
                                            arrayList = new ArrayList(4);
                                        }
                                        arrayList.add(entry.getKey());
                                        entry.setValue(valueOf);
                                    }
                                }
                                if (arrayList != null) {
                                    arrayList.forEach(this.idlePingHandler);
                                    hashMap.keySet().retainAll(kafkaConsumer.assignment());
                                }
                            }
                        }
                    } catch (Throwable th2) {
                        if (kafkaConsumer != null) {
                            if (0 != 0) {
                                try {
                                    kafkaConsumer.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                kafkaConsumer.close();
                            }
                        }
                        throw th2;
                    }
                }
                log.info("Consumer loop finished");
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                log.info("Consumer thread terminating");
            } catch (Throwable th5) {
                log.warn("Exception caught in consumer polling thread", th5);
                log.info("Consumer thread terminating");
            }
        } catch (Throwable th6) {
            log.info("Consumer thread terminating");
            throw th6;
        }
    }

    private void handleRecords(ConsumerRecords<K, V> consumerRecords, org.apache.kafka.clients.consumer.Consumer<K, V> consumer) {
        if (this.recordsHandler != null) {
            acceptRetryable(consumerRecords, this.recordsHandler, consumer);
        } else {
            if (!$assertionsDisabled && this.recordHandler == null) {
                throw new AssertionError();
            }
            Iterator it = consumerRecords.iterator();
            while (it.hasNext()) {
                acceptRetryable((ConsumerRecord) it.next(), this.recordHandler, consumer);
            }
        }
    }

    private <T> void acceptRetryable(T t, Consumer<? super T> consumer, org.apache.kafka.clients.consumer.Consumer<K, V> consumer2) {
        applyRetryable(t, obj -> {
            consumer.accept(obj);
            return null;
        }, consumer2);
    }

    private <T, R> R applyRetryable(T t, Function<? super T, ? extends R> function, org.apache.kafka.clients.consumer.Consumer<K, V> consumer) {
        CompletableFuture<org.apache.kafka.clients.consumer.Consumer<K, V>> poll;
        long j = 100;
        boolean z = false;
        while (!this.closed) {
            try {
                try {
                    R apply = function.apply(t);
                    if (z) {
                        Thread.currentThread().interrupt();
                    }
                    return apply;
                } finally {
                    try {
                    } catch (Throwable th) {
                    }
                }
            } catch (Exception e) {
                try {
                    log.warn("Exception caught while processing {} - retrying in {} ms", new Object[]{formatRecord(t), Long.valueOf(j), e});
                    while (!this.closed && (poll = this.tasks.poll(j, TimeUnit.MILLISECONDS)) != null) {
                        try {
                            poll.complete(consumer);
                        } catch (InterruptedException e2) {
                            log.info("Interrupted - keeping retrying");
                            z = true;
                        }
                    }
                    j = Math.min(j * 2, MAX_RETRY_DELAY);
                    if (z) {
                        Thread.currentThread().interrupt();
                    }
                } catch (Throwable th2) {
                    if (z) {
                        Thread.currentThread().interrupt();
                    }
                    throw th2;
                }
            }
        }
        return null;
    }

    private static String formatRecord(Object obj) {
        if (obj instanceof ConsumerRecord) {
            ConsumerRecord consumerRecord = (ConsumerRecord) obj;
            return String.format("message from topic-partition %s-%s, offset %d, timestamp %tc", consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()));
        }
        if (!(obj instanceof ConsumerRecords)) {
            return String.valueOf(obj);
        }
        ConsumerRecords consumerRecords = (ConsumerRecords) obj;
        return String.format("%d messages from topic-partitions %s", Integer.valueOf(consumerRecords.count()), consumerRecords.partitions());
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        synchronized (this.lock) {
            if (this.closed) {
                return;
            }
            submit(consumer -> {
                this.closed = true;
                return null;
            }).join();
        }
    }

    static {
        $assertionsDisabled = !ConsumerContainer.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(ConsumerContainer.class);
        DEFAULT_CONSUMER_POLL_TIMEOUT = TimeUnit.SECONDS.toMillis(1L);
        containerCount = new AtomicInteger();
    }
}
