package org.apache.flink.connector.pulsar.source.reader;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.pulsar.source.AbstractPartition;
import org.apache.flink.connector.pulsar.source.BrokerPartition;
import org.apache.flink.connector.pulsar.source.MessageDeserializer;
import org.apache.flink.connector.pulsar.source.PartitionReader;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.connector.pulsar.source.StartOffsetInitializer;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.connector.pulsar.source.util.AsyncUtils;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ComponentClosingUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.com.google.common.io.Closer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.class */
public class PulsarPartitionSplitReader<T> implements SplitReader<ParsedMessage<T>, PulsarPartitionSplit>, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarPartitionSplitReader.class);
    private final PriorityQueue<PartitionReader> readerQueue = new PriorityQueue<>();
    private final SimpleCollector<T> collector = new SimpleCollector<>();
    private final ConsumerConfigurationData<byte[]> consumerConfigurationData;
    private final PulsarClient client;
    private final PulsarAdmin pulsarAdmin;
    private final MessageDeserializer<T> messageDeserializer;
    private final Duration maxFetchTime;
    private final int maxFetchRecords;
    private final long closeTimeout;
    private final PulsarSourceOptions.OffsetVerification offsetVerification;
    private volatile boolean wakeup;
    private final ExecutorProvider listenerExecutor;

    /* loaded from: input_file:org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader$PulsarPartitionSplitRecords.class */
    private static class PulsarPartitionSplitRecords<T> implements RecordsWithSplitIds<T> {
        private final Map<String, Collection<T>> recordsBySplits;
        private final Set<String> finishedSplits;
        private Iterator<Map.Entry<String, Collection<T>>> splitIterator;
        private String currentSplitId;
        private Iterator<T> recordIterator;

        private PulsarPartitionSplitRecords() {
            this.recordsBySplits = new HashMap();
            this.finishedSplits = new HashSet();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Collection<T> recordsForSplit(String str) {
            return this.recordsBySplits.computeIfAbsent(str, str2 -> {
                return new ArrayList();
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addFinishedSplit(String str) {
            this.finishedSplits.add(str);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void prepareForRead() {
            this.splitIterator = this.recordsBySplits.entrySet().iterator();
        }

        @Nullable
        public String nextSplit() {
            if (!this.splitIterator.hasNext()) {
                this.currentSplitId = null;
                this.recordIterator = null;
                return null;
            }
            Map.Entry<String, Collection<T>> next = this.splitIterator.next();
            this.currentSplitId = next.getKey();
            this.recordIterator = next.getValue().iterator();
            return this.currentSplitId;
        }

        @Nullable
        public T nextRecordFromSplit() {
            Preconditions.checkNotNull(this.currentSplitId, "Make sure nextSplit() did not return null before iterate over the records split.");
            if (this.recordIterator.hasNext()) {
                return this.recordIterator.next();
            }
            return null;
        }

        public Set<String> finishedSplits() {
            return this.finishedSplits;
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader$SimpleCollector.class */
    private static class SimpleCollector<T> implements Collector<T> {
        private final List<T> records;

        private SimpleCollector() {
            this.records = new ArrayList();
        }

        public void collect(T t) {
            this.records.add(t);
        }

        public void close() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<T> getRecords() {
            return this.records;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void reset() {
            this.records.clear();
        }
    }

    public PulsarPartitionSplitReader(Configuration configuration, ConsumerConfigurationData<byte[]> consumerConfigurationData, PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, MessageDeserializer<T> messageDeserializer, ExecutorProvider executorProvider) {
        this.consumerConfigurationData = consumerConfigurationData;
        this.client = pulsarClient;
        this.pulsarAdmin = pulsarAdmin;
        this.messageDeserializer = messageDeserializer;
        this.maxFetchTime = Duration.ofMillis(((Long) configuration.get(PulsarSourceOptions.MAX_FETCH_TIME)).longValue());
        this.maxFetchRecords = ((Integer) configuration.get(PulsarSourceOptions.MAX_FETCH_RECORDS)).intValue();
        this.closeTimeout = ((Long) configuration.get(PulsarSourceOptions.CLOSE_TIMEOUT_MS)).longValue();
        this.offsetVerification = (PulsarSourceOptions.OffsetVerification) configuration.get(PulsarSourceOptions.VERIFY_INITIAL_OFFSETS);
        this.listenerExecutor = executorProvider;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        ComponentClosingUtils.closeWithTimeout("PulsarSourceEnumerator", (ThrowingRunnable<Exception>) () -> {
            Closer create = Closer.create();
            Throwable th = null;
            try {
                PriorityQueue<PartitionReader> priorityQueue = this.readerQueue;
                create.getClass();
                priorityQueue.forEach((v1) -> {
                    r1.register(v1);
                });
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th3;
            }
        }, this.closeTimeout);
    }

    public RecordsWithSplitIds<ParsedMessage<T>> fetch() {
        this.wakeup = false;
        PulsarPartitionSplitRecords pulsarPartitionSplitRecords = new PulsarPartitionSplitRecords();
        if (this.readerQueue.isEmpty()) {
            return pulsarPartitionSplitRecords;
        }
        Deadline fromNow = Deadline.fromNow(this.maxFetchTime);
        for (int i = 0; i < this.maxFetchRecords && !this.readerQueue.isEmpty() && fromNow.hasTimeLeft() && !this.wakeup; i++) {
            PartitionReader poll = this.readerQueue.poll();
            try {
                Iterator<Message<?>> nextBatch = poll.nextBatch();
                if (nextBatch.hasNext()) {
                    while (nextBatch.hasNext()) {
                        Message<?> next = nextBatch.next();
                        Collection recordsForSplit = pulsarPartitionSplitRecords.recordsForSplit(poll.getSplit().splitId());
                        this.messageDeserializer.deserialize(next, this.collector);
                        this.collector.getRecords().forEach(obj -> {
                            recordsForSplit.add(new ParsedMessage(obj, next.getMessageId(), next.getEventTime()));
                        });
                        this.collector.reset();
                    }
                }
                if (poll.isStopped()) {
                    LOG.debug("{} has reached stopping condition, current offset is {} @ timestamp {}", new Object[]{poll.getSplit(), poll.getLastMessage().getMessageId(), Long.valueOf(poll.getLastMessage().getEventTime())});
                    pulsarPartitionSplitRecords.addFinishedSplit(poll.getSplit().splitId());
                    poll.close();
                } else {
                    this.readerQueue.add(poll);
                }
            } catch (IOException e) {
                ExceptionUtils.rethrow(e, "Error while fetching from " + poll.getSplit());
            }
        }
        pulsarPartitionSplitRecords.prepareForRead();
        return pulsarPartitionSplitRecords;
    }

    public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        try {
            AsyncUtils.parallelAsync(splitsChange.splits(), this::createPartitionReaderAsync, (pulsarPartitionSplit, partitionReader) -> {
                this.readerQueue.add(partitionReader);
            }, PulsarClientException.class);
        } catch (InterruptedException e) {
            Thread.interrupted();
        } catch (TimeoutException e2) {
            throw new IllegalStateException("Cannot create reader: " + e2.getMessage());
        } catch (PulsarClientException e3) {
            throw new IllegalStateException("Cannot create reader", e3);
        }
    }

    public CompletableFuture<PartitionReader> createPartitionReaderAsync(PulsarPartitionSplit pulsarPartitionSplit) throws PulsarClientException {
        AbstractPartition partition = pulsarPartitionSplit.getPartition();
        CompletableFuture<PartitionReader> completableFuture = null;
        if (partition.getPartitionType() == AbstractPartition.PartitionType.Broker) {
            BrokerPartition brokerPartition = (BrokerPartition) partition;
            try {
                ConsumerConfigurationData<byte[]> m1087clone = this.consumerConfigurationData.m1087clone();
                CompletableFuture completableFuture2 = new CompletableFuture();
                if (brokerPartition.getTopicRange().getPulsarRange() != BrokerPartition.FULL_RANGE) {
                    m1087clone.setKeySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(brokerPartition.getTopicRange().getPulsarRange()));
                    m1087clone.setSubscriptionName(m1087clone.getSubscriptionName() + brokerPartition.getTopicRange().getPulsarRange());
                }
                MessageId lastConsumedId = pulsarPartitionSplit.getLastConsumedId();
                StartOffsetInitializer offset = lastConsumedId != null ? StartOffsetInitializer.offset(lastConsumedId, false) : pulsarPartitionSplit.getStartOffsetInitializer();
                StartOffsetInitializer.CreationConfiguration creationConfiguration = new StartOffsetInitializer.CreationConfiguration(m1087clone);
                offset.initializeBeforeCreation(brokerPartition, creationConfiguration);
                ConsumerImpl<byte[]> consumerImpl = new ConsumerImpl<byte[]>((PulsarClientImpl) this.client, brokerPartition.getTopic(), creationConfiguration.getConsumerConfigurationData(), this.listenerExecutor, TopicName.getPartitionIndex(brokerPartition.getTopic()), false, completableFuture2, creationConfiguration.getInitialMessageId(), creationConfiguration.getRollbackInS(), Schema.BYTES, null, true) { // from class: org.apache.flink.connector.pulsar.source.reader.PulsarPartitionSplitReader.1
                };
                offset.initializeAfterCreation(brokerPartition, consumerImpl);
                pulsarPartitionSplit.getStopCondition().init(brokerPartition, consumerImpl);
                if (this.offsetVerification != PulsarSourceOptions.OffsetVerification.IGNORE) {
                    offset.verifyOffset(brokerPartition, wrap(() -> {
                        return Optional.ofNullable(this.pulsarAdmin.topics().getLastMessageId(brokerPartition.getTopic()));
                    }), wrap(() -> {
                        return this.pulsarAdmin.topics().peekMessages(brokerPartition.getTopic(), m1087clone.getSubscriptionName(), 1).stream().findFirst();
                    })).ifPresent(str -> {
                        reportDataLoss(brokerPartition, str);
                    });
                }
                completableFuture = completableFuture2.thenApply(consumer -> {
                    return new PartitionReader(pulsarPartitionSplit, consumerImpl, pulsarPartitionSplit.getStopCondition());
                });
            } catch (PulsarClientException.TopicDoesNotExistException e) {
                throw new IllegalStateException("Cannot subscribe to partition " + brokerPartition, e);
            } catch (PulsarClientException e2) {
                throw new IllegalStateException("Cannot add split " + pulsarPartitionSplit, e2);
            } catch (Exception e3) {
                throw PulsarClientException.unwrap(e3);
            }
        }
        return completableFuture;
    }

    private <T> Supplier<T> wrap(SupplierWithException<T, ?> supplierWithException) {
        return () -> {
            try {
                return supplierWithException.get();
            } catch (Throwable th) {
                ExceptionUtils.rethrow(th);
                return null;
            }
        };
    }

    private void reportDataLoss(AbstractPartition abstractPartition, String str) {
        String format = String.format("While initializing %s encountered the following error: %s.\nPossible reasons include data being already deleted because of wrong retention or wrong offsets.\nTo change the behavior of the offset verification, please refer to the option \"%s\".", abstractPartition, str, PulsarSourceOptions.VERIFY_INITIAL_OFFSETS.key());
        if (this.offsetVerification == PulsarSourceOptions.OffsetVerification.FAIL_ON_MISMATCH) {
            throw new IllegalStateException(format);
        }
        LOG.warn(format);
    }

    public void wakeUp() {
        this.wakeup = true;
    }
}
