/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.reader;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.pulsar.source.AbstractPartition;
import org.apache.flink.connector.pulsar.source.BrokerPartition;
import org.apache.flink.connector.pulsar.source.MessageDeserializer;
import org.apache.flink.connector.pulsar.source.PartitionReader;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.connector.pulsar.source.StartOffsetInitializer;
import org.apache.flink.connector.pulsar.source.reader.ParsedMessage;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.connector.pulsar.source.util.AsyncUtils;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ComponentClosingUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.com.google.common.io.Closer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarPartitionSplitReader<T>
implements SplitReader<ParsedMessage<T>, PulsarPartitionSplit>,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarPartitionSplitReader.class);
    private final PriorityQueue<PartitionReader> readerQueue = new PriorityQueue();
    private final SimpleCollector<T> collector = new SimpleCollector();
    private final ConsumerConfigurationData<byte[]> consumerConfigurationData;
    private final PulsarClient client;
    private final PulsarAdmin pulsarAdmin;
    private final MessageDeserializer<T> messageDeserializer;
    private final Duration maxFetchTime;
    private final int maxFetchRecords;
    private final long closeTimeout;
    private final PulsarSourceOptions.OffsetVerification offsetVerification;
    private volatile boolean wakeup;
    private final ExecutorProvider listenerExecutor;

    public PulsarPartitionSplitReader(Configuration configuration, ConsumerConfigurationData<byte[]> consumerConfigurationData, PulsarClient client, PulsarAdmin pulsarAdmin, MessageDeserializer<T> messageDeserializer, ExecutorProvider listenerExecutor) {
        this.consumerConfigurationData = consumerConfigurationData;
        this.client = client;
        this.pulsarAdmin = pulsarAdmin;
        this.messageDeserializer = messageDeserializer;
        this.maxFetchTime = Duration.ofMillis((Long)configuration.get(PulsarSourceOptions.MAX_FETCH_TIME));
        this.maxFetchRecords = (Integer)configuration.get(PulsarSourceOptions.MAX_FETCH_RECORDS);
        this.closeTimeout = (Long)configuration.get(PulsarSourceOptions.CLOSE_TIMEOUT_MS);
        this.offsetVerification = (PulsarSourceOptions.OffsetVerification)((Object)configuration.get(PulsarSourceOptions.VERIFY_INITIAL_OFFSETS));
        this.listenerExecutor = listenerExecutor;
    }

    @Override
    public void close() {
        ComponentClosingUtils.closeWithTimeout("PulsarSourceEnumerator", (ThrowingRunnable<Exception>)((ThrowingRunnable)() -> {
            try (Closer closer = Closer.create();){
                this.readerQueue.forEach(closer::register);
            }
        }), this.closeTimeout);
    }

    public RecordsWithSplitIds<ParsedMessage<T>> fetch() {
        this.wakeup = false;
        PulsarPartitionSplitRecords<ParsedMessage<T>> recordsBySplits = new PulsarPartitionSplitRecords<ParsedMessage<T>>();
        if (this.readerQueue.isEmpty()) {
            return recordsBySplits;
        }
        Deadline deadline = Deadline.fromNow((Duration)this.maxFetchTime);
        for (int numRecords = 0; numRecords < this.maxFetchRecords && !this.readerQueue.isEmpty() && deadline.hasTimeLeft() && !this.wakeup; ++numRecords) {
            PartitionReader reader = this.readerQueue.poll();
            try {
                Iterator<Message<?>> messages = reader.nextBatch();
                if (messages.hasNext()) {
                    while (messages.hasNext()) {
                        Message<?> message = messages.next();
                        Collection recordsForSplit = ((PulsarPartitionSplitRecords)recordsBySplits).recordsForSplit(reader.getSplit().splitId());
                        this.messageDeserializer.deserialize(message, this.collector);
                        ((SimpleCollector)this.collector).getRecords().forEach(r -> recordsForSplit.add(new ParsedMessage<Object>(r, message.getMessageId(), message.getEventTime())));
                        ((SimpleCollector)this.collector).reset();
                    }
                }
                if (reader.isStopped()) {
                    LOG.debug("{} has reached stopping condition, current offset is {} @ timestamp {}", new Object[]{reader.getSplit(), reader.getLastMessage().getMessageId(), reader.getLastMessage().getEventTime()});
                    ((PulsarPartitionSplitRecords)recordsBySplits).addFinishedSplit(reader.getSplit().splitId());
                    reader.close();
                    continue;
                }
                this.readerQueue.add(reader);
                continue;
            }
            catch (IOException e) {
                ExceptionUtils.rethrow((Throwable)e, (String)("Error while fetching from " + reader.getSplit()));
            }
        }
        ((PulsarPartitionSplitRecords)recordsBySplits).prepareForRead();
        return recordsBySplits;
    }

    public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        try {
            AsyncUtils.parallelAsync(splitsChange.splits(), this::createPartitionReaderAsync, (partition, reader) -> this.readerQueue.add((PartitionReader)reader), PulsarClientException.class);
        }
        catch (PulsarClientException e) {
            throw new IllegalStateException("Cannot create reader", e);
        }
        catch (TimeoutException e) {
            throw new IllegalStateException("Cannot create reader: " + e.getMessage());
        }
        catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    public CompletableFuture<PartitionReader> createPartitionReaderAsync(PulsarPartitionSplit split) throws PulsarClientException {
        AbstractPartition abstractPartition = split.getPartition();
        CompletionStage completableFuture = null;
        if (abstractPartition.getPartitionType() == AbstractPartition.PartitionType.Broker) {
            BrokerPartition partition = (BrokerPartition)abstractPartition;
            try {
                MessageId lastConsumedId;
                Object conf = this.consumerConfigurationData.clone();
                CompletableFuture subscribeFuture = new CompletableFuture();
                if (partition.getTopicRange().getPulsarRange() != BrokerPartition.FULL_RANGE) {
                    ((ConsumerConfigurationData)conf).setKeySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(partition.getTopicRange().getPulsarRange()));
                    ((ConsumerConfigurationData)conf).setSubscriptionName(((ConsumerConfigurationData)conf).getSubscriptionName() + partition.getTopicRange().getPulsarRange());
                }
                StartOffsetInitializer startOffsetInitializer = (lastConsumedId = split.getLastConsumedId()) != null ? StartOffsetInitializer.offset(lastConsumedId, false) : split.getStartOffsetInitializer();
                StartOffsetInitializer.CreationConfiguration creationConfiguration = new StartOffsetInitializer.CreationConfiguration((ConsumerConfigurationData<byte[]>)conf);
                startOffsetInitializer.initializeBeforeCreation(partition, creationConfiguration);
                ConsumerImpl<byte[]> consumer = new ConsumerImpl<byte[]>((PulsarClientImpl)this.client, partition.getTopic(), creationConfiguration.getConsumerConfigurationData(), this.listenerExecutor, TopicName.getPartitionIndex(partition.getTopic()), false, subscribeFuture, creationConfiguration.getInitialMessageId(), creationConfiguration.getRollbackInS(), Schema.BYTES, null, true){};
                startOffsetInitializer.initializeAfterCreation(partition, consumer);
                split.getStopCondition().init(partition, (Consumer<byte[]>)consumer);
                if (this.offsetVerification != PulsarSourceOptions.OffsetVerification.IGNORE) {
                    startOffsetInitializer.verifyOffset(partition, this.wrap(() -> Optional.ofNullable(this.pulsarAdmin.topics().getLastMessageId(partition.getTopic()))), this.wrap(() -> this.lambda$createPartitionReaderAsync$4(partition, (ConsumerConfigurationData)conf))).ifPresent(error -> this.reportDataLoss(partition, (String)error));
                }
                completableFuture = subscribeFuture.thenApply(c -> new PartitionReader(split, consumer, split.getStopCondition()));
            }
            catch (PulsarClientException.TopicDoesNotExistException e) {
                throw new IllegalStateException("Cannot subscribe to partition " + partition, e);
            }
            catch (PulsarClientException e) {
                throw new IllegalStateException("Cannot add split " + split, e);
            }
            catch (Exception e) {
                throw PulsarClientException.unwrap(e);
            }
        }
        return completableFuture;
    }

    private <T> Supplier<T> wrap(SupplierWithException<T, ?> supplierWithException) {
        return () -> {
            try {
                return supplierWithException.get();
            }
            catch (Throwable throwable) {
                ExceptionUtils.rethrow((Throwable)throwable);
                return null;
            }
        };
    }

    private void reportDataLoss(AbstractPartition partition, String error) {
        String fullError = String.format("While initializing %s encountered the following error: %s.\nPossible reasons include data being already deleted because of wrong retention or wrong offsets.\nTo change the behavior of the offset verification, please refer to the option \"%s\".", partition, error, PulsarSourceOptions.VERIFY_INITIAL_OFFSETS.key());
        if (this.offsetVerification == PulsarSourceOptions.OffsetVerification.FAIL_ON_MISMATCH) {
            throw new IllegalStateException(fullError);
        }
        LOG.warn(fullError);
    }

    public void wakeUp() {
        this.wakeup = true;
    }

    private /* synthetic */ Optional lambda$createPartitionReaderAsync$4(BrokerPartition partition, ConsumerConfigurationData conf) throws Throwable {
        return this.pulsarAdmin.topics().peekMessages(partition.getTopic(), conf.getSubscriptionName(), 1).stream().findFirst();
    }

    private static class SimpleCollector<T>
    implements Collector<T> {
        private final List<T> records = new ArrayList<T>();

        private SimpleCollector() {
        }

        public void collect(T record) {
            this.records.add(record);
        }

        public void close() {
        }

        private List<T> getRecords() {
            return this.records;
        }

        private void reset() {
            this.records.clear();
        }
    }

    private static class PulsarPartitionSplitRecords<T>
    implements RecordsWithSplitIds<T> {
        private final Map<String, Collection<T>> recordsBySplits = new HashMap<String, Collection<T>>();
        private final Set<String> finishedSplits = new HashSet<String>();
        private Iterator<Map.Entry<String, Collection<T>>> splitIterator;
        private String currentSplitId;
        private Iterator<T> recordIterator;

        private PulsarPartitionSplitRecords() {
        }

        private Collection<T> recordsForSplit(String splitId) {
            return this.recordsBySplits.computeIfAbsent(splitId, id -> new ArrayList());
        }

        private void addFinishedSplit(String splitId) {
            this.finishedSplits.add(splitId);
        }

        private void prepareForRead() {
            this.splitIterator = this.recordsBySplits.entrySet().iterator();
        }

        @Nullable
        public String nextSplit() {
            if (this.splitIterator.hasNext()) {
                Map.Entry<String, Collection<T>> entry = this.splitIterator.next();
                this.currentSplitId = entry.getKey();
                this.recordIterator = entry.getValue().iterator();
                return this.currentSplitId;
            }
            this.currentSplitId = null;
            this.recordIterator = null;
            return null;
        }

        @Nullable
        public T nextRecordFromSplit() {
            Preconditions.checkNotNull((Object)this.currentSplitId, (String)"Make sure nextSplit() did not return null before iterate over the records split.");
            if (this.recordIterator.hasNext()) {
                return this.recordIterator.next();
            }
            return null;
        }

        public Set<String> finishedSplits() {
            return this.finishedSplits;
        }
    }
}

