package org.apache.flink.streaming.connectors.pulsar.internal;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.shade.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.class */
public class PulsarFetcher<T> {
    private static final Logger log;
    private static final int NO_TIMESTAMPS_WATERMARKS = 0;
    private static final int PERIODIC_WATERMARKS = 1;
    private static final int PUNCTUATED_WATERMARKS = 2;
    protected final SourceFunction.SourceContext<T> sourceContext;
    protected final Map<TopicRange, MessageId> seedTopicsWithInitialOffsets;
    private final Object checkpointLock;
    protected final List<PulsarTopicState> subscribedPartitionStates;
    protected final ClosableBlockingQueue<PulsarTopicState> unassignedPartitionsQueue;
    private final int timestampWatermarkMode;
    private final SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic;
    private final SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated;
    private final ClassLoader userCodeClassLoader;
    private final StreamingRuntimeContext runtimeContext;
    protected final ClientConfigurationData clientConf;
    protected final Map<String, Object> readerConf;
    protected final PulsarDeserializationSchema<T> deserializer;
    protected final int pollTimeoutMs;
    private final int commitMaxRetries;
    protected final PulsarMetadataReader metadataReader;
    private volatile long maxWatermarkSoFar;
    private volatile boolean running;
    private Map<TopicRange, ReaderThread> topicToThread;
    private boolean failOnDataLoss;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher$BreakingException.class */
    public static class BreakingException extends Exception {
        static final BreakingException INSTANCE = new BreakingException();

        private BreakingException() {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher$PeriodicWatermarkEmitter.class */
    private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback {
        private final List<PulsarTopicState> allPartitions;
        private final SourceFunction.SourceContext<?> emitter;
        private final ProcessingTimeService timerService;
        private final long interval;
        private long lastWatermarkTimestamp = Long.MIN_VALUE;

        PeriodicWatermarkEmitter(List<PulsarTopicState> list, SourceFunction.SourceContext<?> sourceContext, ProcessingTimeService processingTimeService, long j) {
            this.allPartitions = (List) Preconditions.checkNotNull(list);
            this.emitter = (SourceFunction.SourceContext) Preconditions.checkNotNull(sourceContext);
            this.timerService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
            this.interval = j;
        }

        public void start() {
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + this.interval, this);
        }

        public void onProcessingTime(long j) throws Exception {
            long currentWatermarkTimestamp;
            long j2 = Long.MAX_VALUE;
            boolean z = false;
            for (PulsarTopicState pulsarTopicState : this.allPartitions) {
                synchronized (pulsarTopicState) {
                    currentWatermarkTimestamp = ((PulsarTopicStateWithPeriodicWatermarks) pulsarTopicState).getCurrentWatermarkTimestamp();
                }
                j2 = Math.min(j2, currentWatermarkTimestamp);
                z = true;
            }
            if (z && j2 > this.lastWatermarkTimestamp) {
                this.lastWatermarkTimestamp = j2;
                this.emitter.emitWatermark(new Watermark(j2));
            }
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + this.interval, this);
        }
    }

    public PulsarFetcher(SourceFunction.SourceContext<T> sourceContext, Map<TopicRange, MessageId> map, SerializedValue<AssignerWithPeriodicWatermarks<T>> serializedValue, SerializedValue<AssignerWithPunctuatedWatermarks<T>> serializedValue2, ProcessingTimeService processingTimeService, long j, ClassLoader classLoader, StreamingRuntimeContext streamingRuntimeContext, ClientConfigurationData clientConfigurationData, Map<String, Object> map2, int i, PulsarDeserializationSchema<T> pulsarDeserializationSchema, PulsarMetadataReader pulsarMetadataReader) throws Exception {
        this(sourceContext, map, serializedValue, serializedValue2, processingTimeService, j, classLoader, streamingRuntimeContext, clientConfigurationData, map2, i, 3, pulsarDeserializationSchema, pulsarMetadataReader);
    }

    public PulsarFetcher(SourceFunction.SourceContext<T> sourceContext, Map<TopicRange, MessageId> map, SerializedValue<AssignerWithPeriodicWatermarks<T>> serializedValue, SerializedValue<AssignerWithPunctuatedWatermarks<T>> serializedValue2, ProcessingTimeService processingTimeService, long j, ClassLoader classLoader, StreamingRuntimeContext streamingRuntimeContext, ClientConfigurationData clientConfigurationData, Map<String, Object> map2, int i, int i2, PulsarDeserializationSchema<T> pulsarDeserializationSchema, PulsarMetadataReader pulsarMetadataReader) throws Exception {
        this.maxWatermarkSoFar = Long.MIN_VALUE;
        this.running = true;
        this.failOnDataLoss = true;
        this.sourceContext = sourceContext;
        this.seedTopicsWithInitialOffsets = map;
        this.checkpointLock = sourceContext.getCheckpointLock();
        this.userCodeClassLoader = classLoader;
        this.runtimeContext = streamingRuntimeContext;
        this.clientConf = clientConfigurationData;
        this.readerConf = map2 == null ? new HashMap<>() : map2;
        this.failOnDataLoss = Boolean.parseBoolean(this.readerConf.getOrDefault(PulsarOptions.FAIL_ON_DATA_LOSS_OPTION_KEY, "true").toString());
        this.readerConf.remove(PulsarOptions.FAIL_ON_DATA_LOSS_OPTION_KEY);
        this.pollTimeoutMs = i;
        this.commitMaxRetries = i2;
        this.deserializer = pulsarDeserializationSchema;
        this.metadataReader = pulsarMetadataReader;
        this.watermarksPeriodic = serializedValue;
        this.watermarksPunctuated = serializedValue2;
        if (serializedValue == null) {
            if (serializedValue2 == null) {
                this.timestampWatermarkMode = 0;
            } else {
                this.timestampWatermarkMode = 2;
            }
        } else {
            if (serializedValue2 != null) {
                throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
            }
            this.timestampWatermarkMode = 1;
        }
        this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
        this.subscribedPartitionStates = createPartitionStateHolders(map, this.timestampWatermarkMode, serializedValue, serializedValue2, classLoader);
        Iterator<PulsarTopicState> it = this.subscribedPartitionStates.iterator();
        while (it.hasNext()) {
            if (!it.next().isOffsetDefined()) {
                throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets.");
            }
        }
        Iterator<PulsarTopicState> it2 = this.subscribedPartitionStates.iterator();
        while (it2.hasNext()) {
            this.unassignedPartitionsQueue.add(it2.next());
        }
        if (this.timestampWatermarkMode == 1) {
            new PeriodicWatermarkEmitter(this.subscribedPartitionStates, sourceContext, processingTimeService, j).start();
        }
    }

    public void runFetchLoop() throws Exception {
        int i;
        int i2;
        int i3;
        this.topicToThread = new HashMap();
        ExceptionProxy exceptionProxy = new ExceptionProxy(Thread.currentThread());
        while (this.running) {
            try {
                try {
                    exceptionProxy.checkAndThrowException();
                    List<PulsarTopicState> batchBlocking = this.unassignedPartitionsQueue.getBatchBlocking(5000L);
                    batchBlocking.removeIf(pulsarTopicState -> {
                        return pulsarTopicState.equals(PoisonState.INSTANCE);
                    });
                    if (batchBlocking.isEmpty()) {
                        this.topicToThread.values().removeIf(readerThread -> {
                            return !readerThread.isRunning();
                        });
                    } else {
                        if (!this.running) {
                            throw BreakingException.INSTANCE;
                        }
                        this.topicToThread.putAll(createAndStartReaderThread(batchBlocking, exceptionProxy));
                    }
                    if (this.topicToThread.size() == 0 && this.unassignedPartitionsQueue.isEmpty()) {
                        PulsarTopicState elementBlocking = this.unassignedPartitionsQueue.getElementBlocking();
                        if (elementBlocking.equals(PoisonState.INSTANCE)) {
                            throw BreakingException.INSTANCE;
                        }
                        this.topicToThread.putAll(createAndStartReaderThread(ImmutableList.of(elementBlocking), exceptionProxy));
                    }
                } catch (Throwable th) {
                    this.running = false;
                    Thread.interrupted();
                    do {
                        try {
                            i2 = 0;
                            this.topicToThread.values().removeIf(readerThread2 -> {
                                return !readerThread2.isAlive();
                            });
                            Iterator<ReaderThread> it = this.topicToThread.values().iterator();
                            while (it.hasNext()) {
                                it.next().cancel();
                                i2++;
                            }
                            if (i2 > 0) {
                                Iterator<ReaderThread> it2 = this.topicToThread.values().iterator();
                                while (it2.hasNext()) {
                                    it2.next().join((500 / i2) + 1);
                                }
                            }
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        } catch (Throwable th2) {
                            log.error("Exception while shutting down reader threads", th2);
                        }
                    } while (i2 > 0);
                    throw th;
                }
            } catch (InterruptedException e2) {
                exceptionProxy.checkAndThrowException();
                throw e2;
            } catch (BreakingException e3) {
                this.running = false;
                Thread.interrupted();
                do {
                    try {
                        i = 0;
                        this.topicToThread.values().removeIf(readerThread22 -> {
                            return !readerThread22.isAlive();
                        });
                        Iterator<ReaderThread> it3 = this.topicToThread.values().iterator();
                        while (it3.hasNext()) {
                            it3.next().cancel();
                            i++;
                        }
                        if (i > 0) {
                            Iterator<ReaderThread> it4 = this.topicToThread.values().iterator();
                            while (it4.hasNext()) {
                                it4.next().join((500 / i) + 1);
                            }
                        }
                    } catch (InterruptedException e4) {
                        Thread.currentThread().interrupt();
                        return;
                    } catch (Throwable th3) {
                        log.error("Exception while shutting down reader threads", th3);
                        return;
                    }
                } while (i > 0);
                return;
            }
        }
        this.running = false;
        Thread.interrupted();
        do {
            try {
                i3 = 0;
                this.topicToThread.values().removeIf(readerThread222 -> {
                    return !readerThread222.isAlive();
                });
                Iterator<ReaderThread> it5 = this.topicToThread.values().iterator();
                while (it5.hasNext()) {
                    it5.next().cancel();
                    i3++;
                }
                if (i3 > 0) {
                    Iterator<ReaderThread> it6 = this.topicToThread.values().iterator();
                    while (it6.hasNext()) {
                        it6.next().join((500 / i3) + 1);
                    }
                }
            } catch (InterruptedException e5) {
                Thread.currentThread().interrupt();
                return;
            } catch (Throwable th4) {
                log.error("Exception while shutting down reader threads", th4);
                return;
            }
        } while (i3 > 0);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitRecord(T t, PulsarTopicState pulsarTopicState, MessageId messageId) {
        if (t == null) {
            synchronized (this.checkpointLock) {
                pulsarTopicState.setOffset(messageId);
            }
            return;
        }
        switch (this.timestampWatermarkMode) {
            case 0:
                synchronized (this.checkpointLock) {
                    this.sourceContext.collect(t);
                    pulsarTopicState.setOffset(messageId);
                }
                return;
            case 1:
                emitRecordWithTimestampAndPeriodicWatermark(t, pulsarTopicState, messageId, Long.MIN_VALUE);
                return;
            case 2:
                emitRecordWithTimestampAndPunctuatedWatermark(t, pulsarTopicState, messageId, Long.MIN_VALUE);
                return;
            default:
                return;
        }
    }

    private void emitRecordWithTimestampAndPeriodicWatermark(T t, PulsarTopicState pulsarTopicState, MessageId messageId, long j) {
        long timestampForRecord;
        PulsarTopicStateWithPeriodicWatermarks pulsarTopicStateWithPeriodicWatermarks = (PulsarTopicStateWithPeriodicWatermarks) pulsarTopicState;
        synchronized (pulsarTopicStateWithPeriodicWatermarks) {
            timestampForRecord = pulsarTopicStateWithPeriodicWatermarks.getTimestampForRecord(t, j);
        }
        synchronized (this.checkpointLock) {
            this.sourceContext.collectWithTimestamp(t, timestampForRecord);
            pulsarTopicState.setOffset(messageId);
        }
    }

    private void emitRecordWithTimestampAndPunctuatedWatermark(T t, PulsarTopicState pulsarTopicState, MessageId messageId, long j) {
        PulsarTopicStateWithPunctuatedWatermarks pulsarTopicStateWithPunctuatedWatermarks = (PulsarTopicStateWithPunctuatedWatermarks) pulsarTopicState;
        long timestampForRecord = pulsarTopicStateWithPunctuatedWatermarks.getTimestampForRecord(t, j);
        Watermark checkAndGetNewWatermark = pulsarTopicStateWithPunctuatedWatermarks.checkAndGetNewWatermark(t, timestampForRecord);
        synchronized (this.checkpointLock) {
            this.sourceContext.collectWithTimestamp(t, timestampForRecord);
            pulsarTopicState.setOffset(messageId);
        }
        if (checkAndGetNewWatermark != null) {
            updateMinPunctuatedWatermark(checkAndGetNewWatermark);
        }
    }

    private void updateMinPunctuatedWatermark(Watermark watermark) {
        if (watermark.getTimestamp() > this.maxWatermarkSoFar) {
            long j = Long.MAX_VALUE;
            Iterator<PulsarTopicState> it = this.subscribedPartitionStates.iterator();
            while (it.hasNext()) {
                j = Math.min(j, ((PulsarTopicStateWithPunctuatedWatermarks) it.next()).getCurrentPartitionWatermark());
            }
            if (j > this.maxWatermarkSoFar) {
                synchronized (this.checkpointLock) {
                    if (j > this.maxWatermarkSoFar) {
                        this.maxWatermarkSoFar = j;
                        this.sourceContext.emitWatermark(new Watermark(j));
                    }
                }
            }
        }
    }

    public void cancel() throws Exception {
        this.running = false;
        this.metadataReader.removeCursor((Set) this.subscribedPartitionStates.stream().map((v0) -> {
            return v0.getTopicRange();
        }).collect(Collectors.toSet()));
        this.unassignedPartitionsQueue.addIfOpen(PoisonState.INSTANCE);
    }

    public void commitOffsetToPulsar(Map<TopicRange, MessageId> map, PulsarCommitCallback pulsarCommitCallback) throws InterruptedException {
        doCommitOffsetToPulsar(removeEarliestAndLatest(map), pulsarCommitCallback);
    }

    public void doCommitOffsetToPulsar(Map<TopicRange, MessageId> map, PulsarCommitCallback pulsarCommitCallback) throws InterruptedException {
        int i = 0;
        boolean z = false;
        while (this.running) {
            try {
                try {
                    this.metadataReader.commitCursorToOffset(map);
                    z = true;
                    break;
                } catch (Exception e) {
                    log.warn("Failed to commit cursor to Pulsar.", e);
                    if (i >= this.commitMaxRetries) {
                        log.error("Failed to commit cursor to Pulsar after {} attempts", Integer.valueOf(i));
                        throw e;
                    }
                    i++;
                    Thread.sleep(1000L);
                }
            } catch (Exception e2) {
                if (!this.running) {
                    return;
                } else {
                    pulsarCommitCallback.onException(e2);
                }
            }
        }
        if (z) {
            pulsarCommitCallback.onSuccess();
            for (PulsarTopicState pulsarTopicState : this.subscribedPartitionStates) {
                MessageId messageId = map.get(pulsarTopicState.getTopicRange());
                if (messageId != null) {
                    pulsarTopicState.setCommittedOffset(messageId);
                }
            }
        }
    }

    public Map<TopicRange, MessageId> removeEarliestAndLatest(Map<TopicRange, MessageId> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicRange, MessageId> entry : map.entrySet()) {
            MessageId value = entry.getValue();
            if (!value.equals(MessageId.earliest) && !value.equals(MessageId.latest)) {
                hashMap.put(entry.getKey(), value);
            }
        }
        return hashMap;
    }

    public void addDiscoveredTopics(Set<TopicRange> set) throws IOException, ClassNotFoundException {
        for (PulsarTopicState pulsarTopicState : createPartitionStateHolders((Map) set.stream().collect(Collectors.toMap(topicRange -> {
            return topicRange;
        }, topicRange2 -> {
            return MessageId.earliest;
        })), this.timestampWatermarkMode, this.watermarksPeriodic, this.watermarksPunctuated, this.userCodeClassLoader)) {
            this.subscribedPartitionStates.add(pulsarTopicState);
            this.unassignedPartitionsQueue.add(pulsarTopicState);
        }
    }

    public Map<TopicRange, MessageId> snapshotCurrentState() {
        if (!$assertionsDisabled && !Thread.holdsLock(this.checkpointLock)) {
            throw new AssertionError();
        }
        HashMap hashMap = new HashMap(this.subscribedPartitionStates.size());
        for (PulsarTopicState pulsarTopicState : this.subscribedPartitionStates) {
            hashMap.put(pulsarTopicState.getTopicRange(), pulsarTopicState.getOffset());
        }
        return hashMap;
    }

    public Map<TopicRange, ReaderThread> createAndStartReaderThread(List<PulsarTopicState> list, ExceptionProxy exceptionProxy) {
        this.metadataReader.setupCursor((Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getTopicRange();
        }, (v0) -> {
            return v0.getOffset();
        })), this.failOnDataLoss);
        HashMap hashMap = new HashMap();
        for (PulsarTopicState pulsarTopicState : list) {
            ReaderThread createReaderThread = createReaderThread(exceptionProxy, pulsarTopicState);
            createReaderThread.setName(String.format("Pulsar Reader for %s in task %s", pulsarTopicState.getTopicRange(), this.runtimeContext.getTaskName()));
            createReaderThread.setDaemon(true);
            createReaderThread.start();
            log.info("Starting Thread {}", createReaderThread.getName());
            hashMap.put(pulsarTopicState.getTopicRange(), createReaderThread);
        }
        return hashMap;
    }

    protected List<PulsarTopicState> getSubscribedTopicStates() {
        return this.subscribedPartitionStates;
    }

    protected ReaderThread createReaderThread(ExceptionProxy exceptionProxy, PulsarTopicState pulsarTopicState) {
        return new ReaderThread(this, pulsarTopicState, this.clientConf, this.readerConf, this.deserializer, this.pollTimeoutMs, exceptionProxy, this.failOnDataLoss);
    }

    private List<PulsarTopicState> createPartitionStateHolders(Map<TopicRange, MessageId> map, int i, SerializedValue<AssignerWithPeriodicWatermarks<T>> serializedValue, SerializedValue<AssignerWithPunctuatedWatermarks<T>> serializedValue2, ClassLoader classLoader) throws IOException, ClassNotFoundException {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        switch (i) {
            case 0:
                for (Map.Entry<TopicRange, MessageId> entry : map.entrySet()) {
                    PulsarTopicState pulsarTopicState = new PulsarTopicState(entry.getKey());
                    pulsarTopicState.setOffset(entry.getValue());
                    copyOnWriteArrayList.add(pulsarTopicState);
                }
                return copyOnWriteArrayList;
            case 1:
                for (Map.Entry<TopicRange, MessageId> entry2 : map.entrySet()) {
                    PulsarTopicStateWithPeriodicWatermarks pulsarTopicStateWithPeriodicWatermarks = new PulsarTopicStateWithPeriodicWatermarks(entry2.getKey(), (AssignerWithPeriodicWatermarks) serializedValue.deserializeValue(classLoader));
                    pulsarTopicStateWithPeriodicWatermarks.setOffset(entry2.getValue());
                    copyOnWriteArrayList.add(pulsarTopicStateWithPeriodicWatermarks);
                }
                return copyOnWriteArrayList;
            case 2:
                for (Map.Entry<TopicRange, MessageId> entry3 : map.entrySet()) {
                    PulsarTopicStateWithPunctuatedWatermarks pulsarTopicStateWithPunctuatedWatermarks = new PulsarTopicStateWithPunctuatedWatermarks(entry3.getKey(), (AssignerWithPunctuatedWatermarks) serializedValue2.deserializeValue(classLoader));
                    pulsarTopicStateWithPunctuatedWatermarks.setOffset(entry3.getValue());
                    copyOnWriteArrayList.add(pulsarTopicStateWithPunctuatedWatermarks);
                }
                return copyOnWriteArrayList;
            default:
                throw new RuntimeException();
        }
    }

    public PulsarMetadataReader getMetaDataReader() {
        return this.metadataReader;
    }

    static {
        $assertionsDisabled = !PulsarFetcher.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(PulsarFetcher.class);
    }
}
