package org.apache.flink.streaming.connectors.pulsar;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient;
import org.apache.flink.streaming.connectors.pulsar.internal.MessageIdSerializer;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarCommitCallback;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarSourceStateSerializer;
import org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange;
import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscription;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscriptionSerializer;
import org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.class */
public class FlinkPulsarSource<T> extends RichParallelSourceFunction<T> implements ResultTypeQueryable<T>, CheckpointListener, CheckpointedFunction {
    private static final Logger log = LoggerFactory.getLogger(FlinkPulsarSource.class);
    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
    private static final String OFFSETS_STATE_NAME_V3 = "topic-offset-states";
    protected String adminUrl;
    protected ClientConfigurationData clientConfigurationData;
    protected final Map<String, String> caseInsensitiveParams;
    protected final Map<String, Object> readerConf;
    protected volatile PulsarDeserializationSchema<T> deserializer;
    private Map<TopicRange, MessageId> ownedTopicStarts;
    private SerializedValue<WatermarkStrategy<T>> watermarkStrategy;
    private final long discoveryIntervalMillis;
    protected final int pollTimeoutMs;
    protected final int commitMaxRetries;
    private StartupMode startupMode;
    private transient Map<TopicRange, MessageId> specificStartupOffsets;
    private String externalSubscriptionName;
    private MessageId subscriptionPosition;
    private Map<TopicRange, byte[]> specificStartupOffsetsAsBytes;
    protected final Properties properties;
    protected final UUID uuid;
    private final LinkedHashMap<Long, Map<TopicRange, MessageId>> pendingOffsetsToCommit;
    private volatile transient PulsarFetcher<T> pulsarFetcher;
    protected volatile transient PulsarMetadataReader metadataReader;
    private volatile transient TreeMap<TopicRange, MessageId> restoredState;
    private volatile transient Set<TopicRange> excludeStartMessageIds;
    private transient ListState<Tuple2<TopicSubscription, MessageId>> unionOffsetStates;
    private int oldStateVersion;
    private volatile boolean stateSubEqualexternalSub;
    private volatile transient Thread discoveryLoopThread;
    private volatile boolean running;
    private final boolean useMetrics;
    private transient Counter successfulCommits;
    private transient Counter failedCommits;
    private transient PulsarCommitCallback offsetCommitCallback;
    private transient int taskIndex;
    private transient int numParallelTasks;

    public FlinkPulsarSource(String str, ClientConfigurationData clientConfigurationData, PulsarDeserializationSchema<T> pulsarDeserializationSchema, Properties properties) {
        this.startupMode = StartupMode.LATEST;
        this.subscriptionPosition = MessageId.latest;
        this.uuid = UUID.randomUUID();
        this.pendingOffsetsToCommit = new LinkedHashMap<>();
        this.oldStateVersion = 2;
        this.stateSubEqualexternalSub = false;
        this.running = true;
        this.adminUrl = (String) Preconditions.checkNotNull(str);
        this.clientConfigurationData = (ClientConfigurationData) Preconditions.checkNotNull(clientConfigurationData);
        this.deserializer = pulsarDeserializationSchema;
        this.properties = properties;
        this.caseInsensitiveParams = SourceSinkUtils.validateStreamSourceOptions(Maps.fromProperties(properties));
        this.readerConf = SourceSinkUtils.getReaderParams(Maps.fromProperties(properties));
        this.discoveryIntervalMillis = SourceSinkUtils.getPartitionDiscoveryIntervalInMillis(this.caseInsensitiveParams);
        this.pollTimeoutMs = SourceSinkUtils.getPollTimeoutMs(this.caseInsensitiveParams);
        this.commitMaxRetries = SourceSinkUtils.getCommitMaxRetries(this.caseInsensitiveParams);
        this.useMetrics = SourceSinkUtils.getUseMetrics(this.caseInsensitiveParams);
        CachedPulsarClient.setCacheSize(SourceSinkUtils.getClientCacheSize(this.caseInsensitiveParams));
        if (this.clientConfigurationData.getServiceUrl() == null) {
            throw new IllegalArgumentException("ServiceUrl must be supplied in the client configuration");
        }
        this.oldStateVersion = SourceSinkUtils.getOldStateVersion(this.caseInsensitiveParams, this.oldStateVersion);
    }

    public FlinkPulsarSource(String str, String str2, PulsarDeserializationSchema<T> pulsarDeserializationSchema, Properties properties) {
        this(str2, PulsarClientUtils.newClientConf((String) Preconditions.checkNotNull(str), properties), pulsarDeserializationSchema, properties);
    }

    public FlinkPulsarSource(String str, String str2, DeserializationSchema<T> deserializationSchema, Properties properties) {
        this(str2, PulsarClientUtils.newClientConf((String) Preconditions.checkNotNull(str), properties), PulsarDeserializationSchema.valueOnly(deserializationSchema), properties);
    }

    @Deprecated
    public FlinkPulsarSource<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assignerWithPunctuatedWatermarks) {
        Preconditions.checkNotNull(assignerWithPunctuatedWatermarks);
        if (this.watermarkStrategy != null) {
            throw new IllegalStateException("Some watermark strategy has already been set.");
        }
        try {
            ClosureCleaner.clean(assignerWithPunctuatedWatermarks, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
            return assignTimestampsAndWatermarks((WatermarkStrategy) new AssignerWithPunctuatedWatermarksAdapter.Strategy(assignerWithPunctuatedWatermarks));
        } catch (Exception e) {
            throw new IllegalArgumentException("The given assigner is not serializable", e);
        }
    }

    @Deprecated
    public FlinkPulsarSource<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assignerWithPeriodicWatermarks) {
        Preconditions.checkNotNull(assignerWithPeriodicWatermarks);
        if (this.watermarkStrategy != null) {
            throw new IllegalStateException("Some watermark strategy has already been set.");
        }
        try {
            ClosureCleaner.clean(assignerWithPeriodicWatermarks, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
            return assignTimestampsAndWatermarks((WatermarkStrategy) new AssignerWithPeriodicWatermarksAdapter.Strategy(assignerWithPeriodicWatermarks));
        } catch (Exception e) {
            throw new IllegalArgumentException("The given assigner is not serializable", e);
        }
    }

    public FlinkPulsarSource<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy) {
        Preconditions.checkNotNull(watermarkStrategy);
        try {
            ClosureCleaner.clean(watermarkStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
            this.watermarkStrategy = new SerializedValue<>(watermarkStrategy);
            return this;
        } catch (Exception e) {
            throw new IllegalArgumentException("The given WatermarkStrategy is not serializable", e);
        }
    }

    public FlinkPulsarSource<T> setStartFromEarliest() {
        this.startupMode = StartupMode.EARLIEST;
        this.specificStartupOffsets = null;
        return this;
    }

    public FlinkPulsarSource<T> setStartFromLatest() {
        this.startupMode = StartupMode.LATEST;
        this.specificStartupOffsets = null;
        return this;
    }

    public FlinkPulsarSource<T> setStartFromSpecificOffsets(Map<String, MessageId> map) {
        Preconditions.checkNotNull(map);
        this.specificStartupOffsets = (Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
            return new TopicRange((String) entry.getKey());
        }, (v0) -> {
            return v0.getValue();
        }));
        this.startupMode = StartupMode.SPECIFIC_OFFSETS;
        this.specificStartupOffsetsAsBytes = new HashMap();
        for (Map.Entry<TopicRange, MessageId> entry2 : this.specificStartupOffsets.entrySet()) {
            this.specificStartupOffsetsAsBytes.put(entry2.getKey(), entry2.getValue().toByteArray());
        }
        return this;
    }

    public FlinkPulsarSource<T> setStartFromSubscription(String str) {
        this.startupMode = StartupMode.EXTERNAL_SUBSCRIPTION;
        this.externalSubscriptionName = (String) Preconditions.checkNotNull(str);
        return this;
    }

    public FlinkPulsarSource<T> setStartFromSubscription(String str, MessageId messageId) {
        this.startupMode = StartupMode.EXTERNAL_SUBSCRIPTION;
        this.externalSubscriptionName = (String) Preconditions.checkNotNull(str);
        this.subscriptionPosition = (MessageId) Preconditions.checkNotNull(messageId);
        return this;
    }

    public void open(Configuration configuration) throws Exception {
        if (this.deserializer != null) {
            this.deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext(), metricGroup -> {
                return metricGroup.addGroup("user");
            }));
        }
        this.taskIndex = getRuntimeContext().getIndexOfThisSubtask();
        this.numParallelTasks = getRuntimeContext().getNumberOfParallelSubtasks();
        this.metadataReader = createMetadataReader();
        this.ownedTopicStarts = new HashMap();
        this.excludeStartMessageIds = new HashSet();
        Set<TopicRange> discoverTopicChanges = this.metadataReader.discoverTopicChanges();
        if (this.specificStartupOffsets == null && this.specificStartupOffsetsAsBytes != null) {
            this.specificStartupOffsets = new HashMap();
            for (Map.Entry<TopicRange, byte[]> entry : this.specificStartupOffsetsAsBytes.entrySet()) {
                this.specificStartupOffsets.put(entry.getKey(), MessageId.fromByteArray(entry.getValue()));
            }
        }
        Map<TopicRange, MessageId> offsetForEachTopic = offsetForEachTopic(discoverTopicChanges, this.startupMode, this.specificStartupOffsets);
        boolean z = this.startupMode != StartupMode.EXTERNAL_SUBSCRIPTION || this.stateSubEqualexternalSub;
        if (this.restoredState == null || !z) {
            this.ownedTopicStarts.putAll((Map) offsetForEachTopic.entrySet().stream().filter(entry2 -> {
                return SourceSinkUtils.belongsTo((TopicRange) entry2.getKey(), this.numParallelTasks, this.taskIndex);
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            })));
            if (this.ownedTopicStarts.isEmpty()) {
                log.info("Source {} initially has no topics to read from.", Integer.valueOf(this.taskIndex));
                return;
            } else {
                log.info("Source {} will start reading {} topics from initialized positions: {}", new Object[]{Integer.valueOf(this.taskIndex), Integer.valueOf(this.ownedTopicStarts.size()), this.ownedTopicStarts});
                return;
            }
        }
        offsetForEachTopic.entrySet().stream().filter(entry3 -> {
            return !this.restoredState.containsKey(entry3.getKey());
        }).forEach(entry4 -> {
        });
        this.restoredState.entrySet().stream().filter(entry5 -> {
            return SourceSinkUtils.belongsTo((TopicRange) entry5.getKey(), this.numParallelTasks, this.taskIndex);
        }).forEach(entry6 -> {
            this.ownedTopicStarts.put(entry6.getKey(), entry6.getValue());
            this.excludeStartMessageIds.add(entry6.getKey());
        });
        for (TopicRange topicRange : (Set) Sets.difference(this.restoredState.keySet(), discoverTopicChanges).stream().filter(topicRange2 -> {
            return SourceSinkUtils.belongsTo(topicRange2, this.numParallelTasks, this.taskIndex);
        }).collect(Collectors.toSet())) {
            log.warn(topicRange + " is removed from subscription since it no longer matches with topics settings.");
            this.ownedTopicStarts.remove(topicRange);
        }
        log.info("Source {} will start reading {} topics in restored state {}", new Object[]{Integer.valueOf(this.taskIndex), Integer.valueOf(this.ownedTopicStarts.size()), StringUtils.join(new Set[]{this.ownedTopicStarts.entrySet()})});
    }

    protected String getSubscriptionName() {
        if (this.startupMode != StartupMode.EXTERNAL_SUBSCRIPTION) {
            return "flink-pulsar-" + this.uuid.toString();
        }
        Preconditions.checkNotNull(this.externalSubscriptionName);
        return this.externalSubscriptionName;
    }

    protected PulsarMetadataReader createMetadataReader() throws PulsarClientException {
        return new PulsarMetadataReader(this.adminUrl, this.clientConfigurationData, getSubscriptionName(), this.caseInsensitiveParams, this.taskIndex, this.numParallelTasks, this.startupMode == StartupMode.EXTERNAL_SUBSCRIPTION);
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        if (this.ownedTopicStarts == null) {
            throw new Exception("The partitions were not set for the source");
        }
        this.successfulCommits = getRuntimeContext().getMetricGroup().counter("commitsSucceeded");
        this.failedCommits = getRuntimeContext().getMetricGroup().counter("commitsFailed");
        this.offsetCommitCallback = new PulsarCommitCallback() { // from class: org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.1
            @Override // org.apache.flink.streaming.connectors.pulsar.internal.PulsarCommitCallback
            public void onSuccess() {
                FlinkPulsarSource.this.successfulCommits.inc();
            }

            @Override // org.apache.flink.streaming.connectors.pulsar.internal.PulsarCommitCallback
            public void onException(Throwable th) {
                FlinkPulsarSource.log.warn("source {} failed commit by {}", Integer.valueOf(FlinkPulsarSource.this.taskIndex), th.toString());
                FlinkPulsarSource.this.failedCommits.inc();
            }
        };
        if (this.ownedTopicStarts.isEmpty()) {
            sourceContext.markAsTemporarilyIdle();
        }
        log.info("Source {} creating fetcher with offsets {}", Integer.valueOf(this.taskIndex), StringUtils.join(new Set[]{this.ownedTopicStarts.entrySet()}));
        StreamingRuntimeContext runtimeContext = getRuntimeContext();
        this.pulsarFetcher = createFetcher(sourceContext, this.ownedTopicStarts, this.watermarkStrategy, runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), getRuntimeContext().getUserCodeClassLoader(), runtimeContext, this.useMetrics, this.excludeStartMessageIds);
        if (this.running) {
            if (this.discoveryIntervalMillis < 0) {
                this.pulsarFetcher.runFetchLoop();
            } else {
                runWithTopicsDiscovery();
            }
        }
    }

    protected PulsarFetcher<T> createFetcher(SourceFunction.SourceContext<T> sourceContext, Map<TopicRange, MessageId> map, SerializedValue<WatermarkStrategy<T>> serializedValue, ProcessingTimeService processingTimeService, long j, ClassLoader classLoader, StreamingRuntimeContext streamingRuntimeContext, boolean z, Set<TopicRange> set) throws Exception {
        return new PulsarFetcher<>(sourceContext, map, set, serializedValue, processingTimeService, j, classLoader, streamingRuntimeContext, this.clientConfigurationData, this.readerConf, this.pollTimeoutMs, this.commitMaxRetries, this.deserializer, this.metadataReader, streamingRuntimeContext.getMetricGroup().addGroup(PulsarSourceMetrics.PULSAR_SOURCE_METRICS_GROUP), z);
    }

    public void joinDiscoveryLoopThread() throws InterruptedException {
        if (this.discoveryLoopThread != null) {
            this.discoveryLoopThread.join();
        }
    }

    public void runWithTopicsDiscovery() throws Exception {
        AtomicReference<Exception> atomicReference = new AtomicReference<>();
        createAndStartDiscoveryLoop(atomicReference);
        this.pulsarFetcher.runFetchLoop();
        joinDiscoveryLoopThread();
        Exception exc = atomicReference.get();
        if (exc != null) {
            throw new RuntimeException(exc);
        }
    }

    private void createAndStartDiscoveryLoop(AtomicReference<Exception> atomicReference) {
        this.discoveryLoopThread = new Thread(() -> {
            while (this.running) {
                try {
                    try {
                        Set<TopicRange> discoverTopicChanges = this.metadataReader.discoverTopicChanges();
                        if (this.running && !discoverTopicChanges.isEmpty()) {
                            this.pulsarFetcher.addDiscoveredTopics(discoverTopicChanges);
                        }
                        if (this.running && this.discoveryIntervalMillis != -1) {
                            Thread.sleep(this.discoveryIntervalMillis);
                        }
                    } catch (InterruptedException e) {
                        if (this.running) {
                            cancel();
                            return;
                        }
                        return;
                    } catch (PulsarMetadataReader.ClosedException e2) {
                        if (this.running) {
                            cancel();
                            return;
                        }
                        return;
                    } catch (Exception e3) {
                        atomicReference.set(e3);
                        if (this.running) {
                            cancel();
                            return;
                        }
                        return;
                    }
                } catch (Throwable th) {
                    if (this.running) {
                        cancel();
                    }
                    throw th;
                }
            }
            if (this.running) {
                cancel();
            }
        }, "Pulsar topic discovery for source " + this.taskIndex);
        this.discoveryLoopThread.start();
    }

    public void close() throws Exception {
        cancel();
        joinDiscoveryLoopThread();
        Exception exc = null;
        if (this.metadataReader != null) {
            try {
                this.metadataReader.close();
            } catch (Exception e) {
                exc = e;
            }
        }
        try {
            super.close();
        } catch (Exception e2) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
        }
        if (exc != null) {
            throw exc;
        }
    }

    public void cancel() {
        this.running = false;
        if (this.discoveryLoopThread != null) {
            this.discoveryLoopThread.interrupt();
        }
        if (this.pulsarFetcher != null) {
            try {
                this.pulsarFetcher.cancel();
            } catch (Exception e) {
                log.error("Failed to cancel the Pulsar Fetcher {}", ExceptionUtils.stringifyException(e));
                throw new RuntimeException(e);
            }
        }
    }

    public TypeInformation<T> getProducedType() {
        return this.deserializer.getProducedType();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        OperatorStateStore operatorStateStore = functionInitializationContext.getOperatorStateStore();
        this.unionOffsetStates = operatorStateStore.getUnionListState(new ListStateDescriptor(OFFSETS_STATE_NAME_V3, createStateSerializer()));
        if (!functionInitializationContext.isRestored()) {
            log.info("Source subtask {} has no restore state", Integer.valueOf(this.taskIndex));
            return;
        }
        this.restoredState = new TreeMap<>();
        Iterator<Tuple2<TopicSubscription, MessageId>> it = ((Iterable) this.unionOffsetStates.get()).iterator();
        if (!it.hasNext()) {
            it = tryMigrateState(operatorStateStore);
        }
        while (it.hasNext()) {
            Tuple2<TopicSubscription, MessageId> next = it.next();
            this.restoredState.put(new TopicRange(((TopicSubscription) next.f0).getTopic(), (((TopicSubscription) next.f0).getRange() != null ? ((TopicSubscription) next.f0).getRange() : SerializableRange.ofFullRange()).getPulsarRange()), next.f1);
            String subscriptionName = ((TopicSubscription) next.f0).getSubscriptionName();
            if (!this.stateSubEqualexternalSub && StringUtils.equals(subscriptionName, this.externalSubscriptionName)) {
                this.stateSubEqualexternalSub = true;
                log.info("Source restored state with subscriptionName {}", subscriptionName);
            }
        }
        log.info("Source subtask {} restored state {}", Integer.valueOf(this.taskIndex), StringUtils.join(new Set[]{this.restoredState.entrySet()}));
    }

    @VisibleForTesting
    static TupleSerializer<Tuple2<TopicSubscription, MessageId>> createStateSerializer() {
        return new TupleSerializer<>(Tuple2.class, new TypeSerializer[]{TopicSubscriptionSerializer.INSTANCE, MessageIdSerializer.INSTANCE});
    }

    private Iterator<Tuple2<TopicSubscription, MessageId>> tryMigrateState(OperatorStateStore operatorStateStore) throws Exception {
        log.info("restore old state version {}", Integer.valueOf(this.oldStateVersion));
        PulsarSourceStateSerializer pulsarSourceStateSerializer = new PulsarSourceStateSerializer(getRuntimeContext().getExecutionConfig());
        ListState unionListState = operatorStateStore.getUnionListState(new ListStateDescriptor(OFFSETS_STATE_NAME, pulsarSourceStateSerializer.getSerializer(this.oldStateVersion)));
        ListState unionListState2 = operatorStateStore.getUnionListState(new ListStateDescriptor("topic-partition-offset-states_subName", TypeInformation.of(new TypeHint<String>() { // from class: org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.2
        })));
        Iterator<T> it = ((Iterable) unionListState2.get()).iterator();
        Iterator<T> it2 = ((Iterable) unionListState.get()).iterator();
        log.info("restore old state has data {}", Boolean.valueOf(it2.hasNext()));
        ArrayList arrayList = new ArrayList();
        while (it2.hasNext()) {
            Tuple2<TopicSubscription, MessageId> deserialize = pulsarSourceStateSerializer.deserialize(this.oldStateVersion, it2.next());
            String subscriptionName = ((TopicSubscription) deserialize.f0).getSubscriptionName();
            if (it.hasNext()) {
                subscriptionName = (String) it.next();
            }
            Tuple2 of = Tuple2.of(TopicSubscription.builder().topic(((TopicSubscription) deserialize.f0).getTopic()).range(((TopicSubscription) deserialize.f0).getRange()).subscriptionName(subscriptionName).build(), deserialize.f1);
            log.info("migrationState {}", of);
            arrayList.add(of);
        }
        unionListState.clear();
        unionListState2.clear();
        return arrayList.listIterator();
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (!this.running) {
            log.debug("snapshotState() called on closed source");
            return;
        }
        this.unionOffsetStates.clear();
        PulsarFetcher<T> pulsarFetcher = this.pulsarFetcher;
        if (pulsarFetcher == null) {
            for (Map.Entry<TopicRange, MessageId> entry : this.ownedTopicStarts.entrySet()) {
                this.unionOffsetStates.add(Tuple2.of(TopicSubscription.builder().topic(entry.getKey().getTopic()).range(entry.getKey().getRange()).subscriptionName(getSubscriptionName()).build(), entry.getValue()));
            }
            this.pendingOffsetsToCommit.put(Long.valueOf(functionSnapshotContext.getCheckpointId()), this.restoredState);
            return;
        }
        Map<TopicRange, MessageId> snapshotCurrentState = pulsarFetcher.snapshotCurrentState();
        this.pendingOffsetsToCommit.put(Long.valueOf(functionSnapshotContext.getCheckpointId()), snapshotCurrentState);
        for (Map.Entry<TopicRange, MessageId> entry2 : snapshotCurrentState.entrySet()) {
            this.unionOffsetStates.add(Tuple2.of(TopicSubscription.builder().topic(entry2.getKey().getTopic()).range(entry2.getKey().getRange()).subscriptionName(getSubscriptionName()).build(), entry2.getValue()));
        }
        int size = this.pendingOffsetsToCommit.size() - 100;
        Iterator<Long> it = this.pendingOffsetsToCommit.keySet().iterator();
        while (it.hasNext() && size > 0) {
            it.next();
            it.remove();
        }
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (!this.running) {
            log.info("notifyCheckpointComplete() called on closed source");
            return;
        }
        PulsarFetcher<T> pulsarFetcher = this.pulsarFetcher;
        if (pulsarFetcher == null) {
            log.info("notifyCheckpointComplete() called on uninitialized source");
            return;
        }
        log.debug("Source {} received confirmation for unknown checkpoint id {}", Integer.valueOf(this.taskIndex), Long.valueOf(j));
        try {
            if (!this.pendingOffsetsToCommit.containsKey(Long.valueOf(j))) {
                log.warn("Source {} received confirmation for unknown checkpoint id {}", Integer.valueOf(this.taskIndex), Long.valueOf(j));
                return;
            }
            Map<TopicRange, MessageId> map = this.pendingOffsetsToCommit.get(Long.valueOf(j));
            Iterator<Long> it = this.pendingOffsetsToCommit.keySet().iterator();
            while (it.hasNext()) {
                Long next = it.next();
                it.remove();
                if (Objects.equals(next, Long.valueOf(j))) {
                    break;
                }
            }
            if (map == null || map.size() == 0) {
                log.debug("Source {} has empty checkpoint state", Integer.valueOf(this.taskIndex));
            } else {
                pulsarFetcher.commitOffsetToPulsar(map, this.offsetCommitCallback);
            }
        } catch (Exception e) {
            if (this.running) {
                throw e;
            }
        }
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        log.error("checkpoint aborted, checkpointId: {}", Long.valueOf(j));
    }

    public Map<TopicRange, MessageId> offsetForEachTopic(Set<TopicRange> set, StartupMode startupMode, Map<TopicRange, MessageId> map) {
        switch (startupMode) {
            case LATEST:
                return (Map) set.stream().collect(Collectors.toMap(topicRange -> {
                    return topicRange;
                }, topicRange2 -> {
                    return MessageId.latest;
                }));
            case EARLIEST:
                return (Map) set.stream().collect(Collectors.toMap(topicRange3 -> {
                    return topicRange3;
                }, topicRange4 -> {
                    return MessageId.earliest;
                }));
            case SPECIFIC_OFFSETS:
                Preconditions.checkArgument(set.containsAll(map.keySet()), String.format("Topics designated in startingOffsets should appear in %s, topics:%s, topics in offsets: %s", StringUtils.join(new Set[]{PulsarOptions.TOPIC_OPTION_KEYS}), StringUtils.join(set.toArray()), StringUtils.join(map.entrySet().toArray())));
                HashMap hashMap = new HashMap();
                for (TopicRange topicRange5 : set) {
                    if (map.containsKey(topicRange5)) {
                        hashMap.put(topicRange5, map.get(topicRange5));
                    } else {
                        hashMap.put(topicRange5, MessageId.latest);
                    }
                }
                return hashMap;
            case EXTERNAL_SUBSCRIPTION:
                HashMap hashMap2 = new HashMap();
                for (TopicRange topicRange6 : set) {
                    hashMap2.put(topicRange6, this.metadataReader.getPositionFromSubscription(topicRange6, this.subscriptionPosition));
                }
                return hashMap2;
            default:
                return null;
        }
    }

    public Map<Long, Map<TopicRange, MessageId>> getPendingOffsetsToCommit() {
        return this.pendingOffsetsToCommit;
    }

    public Map<TopicRange, MessageId> getOwnedTopicStarts() {
        return this.ownedTopicStarts;
    }
}
