package org.apache.flink.connector.pulsar.source.enumerator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.pulsar.source.AbstractPartition;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.connector.pulsar.source.PulsarSubscriber;
import org.apache.flink.connector.pulsar.source.SplitSchedulingStrategy;
import org.apache.flink.connector.pulsar.source.StartOffsetInitializer;
import org.apache.flink.connector.pulsar.source.StopCondition;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.util.ComponentClosingUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.class */
public class PulsarSourceEnumerator implements SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumeratorState> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceEnumerator.class);
    private final PulsarSubscriber subscriber;
    private final StartOffsetInitializer startOffsetInitializer;
    private final StopCondition stopCondition;
    private final PulsarAdmin pulsarAdmin;
    private final Configuration configuration;
    private final long partitionDiscoveryIntervalMs;
    private final SplitEnumeratorContext<PulsarPartitionSplit> context;
    private final Set<AbstractPartition> discoveredPartitions;
    private final Map<Integer, List<PulsarPartitionSplit>> readerIdToSplitAssignments;
    private final Map<Integer, List<PulsarPartitionSplit>> pendingPartitionSplitAssignment;
    private boolean noMoreNewPartitionSplits = false;
    private SplitSchedulingStrategy splitSchedulingStrategy;

    /* loaded from: input_file:org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator$PartitionSplitChange.class */
    public static class PartitionSplitChange {
        private final List<PulsarPartitionSplit> newPartitionSplits;
        private final Set<AbstractPartition> removedPartitions;

        private PartitionSplitChange(List<PulsarPartitionSplit> list, Set<AbstractPartition> set) {
            this.newPartitionSplits = list;
            this.removedPartitions = set;
        }
    }

    public PulsarSourceEnumerator(PulsarSubscriber pulsarSubscriber, StartOffsetInitializer startOffsetInitializer, StopCondition stopCondition, PulsarAdmin pulsarAdmin, Configuration configuration, SplitEnumeratorContext<PulsarPartitionSplit> splitEnumeratorContext, Map<Integer, List<PulsarPartitionSplit>> map, SplitSchedulingStrategy splitSchedulingStrategy) {
        this.subscriber = pulsarSubscriber;
        this.subscriber.setContext(splitEnumeratorContext);
        this.startOffsetInitializer = startOffsetInitializer;
        this.stopCondition = stopCondition;
        this.pulsarAdmin = pulsarAdmin;
        this.configuration = configuration;
        this.context = splitEnumeratorContext;
        this.splitSchedulingStrategy = splitSchedulingStrategy;
        this.discoveredPartitions = new HashSet();
        this.readerIdToSplitAssignments = new HashMap(map);
        this.readerIdToSplitAssignments.forEach((num, list) -> {
            list.forEach(pulsarPartitionSplit -> {
                this.discoveredPartitions.add(pulsarPartitionSplit.getPartition());
            });
        });
        this.pendingPartitionSplitAssignment = new HashMap();
        this.partitionDiscoveryIntervalMs = ((Long) configuration.get(PulsarSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS)).longValue();
    }

    public void start() {
        if (this.partitionDiscoveryIntervalMs > 0) {
            this.context.callAsync(this::discoverAndInitializePartitionSplit, this::handlePartitionSplitChanges, 0L, this.partitionDiscoveryIntervalMs);
        } else {
            this.context.callAsync(this::discoverAndInitializePartitionSplit, this::handlePartitionSplitChanges);
        }
    }

    public void handleSplitRequest(int i, @Nullable String str) {
    }

    public void addSplitsBack(List<PulsarPartitionSplit> list, int i) {
        this.splitSchedulingStrategy.addSplitsBack(this.pendingPartitionSplitAssignment, list, i, this.context.currentParallelism());
        assignPendingPartitionSplits();
    }

    public void addReader(int i) {
        LOG.debug("Adding reader {}.", Integer.valueOf(i));
        assignPendingPartitionSplits();
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public PulsarSourceEnumeratorState m14snapshotState(long j) throws Exception {
        return new PulsarSourceEnumeratorState(this.readerIdToSplitAssignments);
    }

    public void close() {
        close(((Long) this.configuration.get(PulsarSourceOptions.CLOSE_TIMEOUT_MS)).longValue()).ifPresent(th -> {
            LOG.warn("Encountered error when closing PulsarSourceEnumerator", th);
        });
    }

    @VisibleForTesting
    Optional<Throwable> close(long j) {
        return ComponentClosingUtils.closeWithTimeout("PulsarSourceEnumerator", (ThrowingRunnable<Exception>) () -> {
            this.pulsarAdmin.close();
        }, j);
    }

    private PartitionSplitChange discoverAndInitializePartitionSplit() throws IOException, InterruptedException, PulsarAdminException {
        PulsarSubscriber.PartitionChange partitionChanges = this.subscriber.getPartitionChanges(this.pulsarAdmin, Collections.unmodifiableSet(this.discoveredPartitions));
        this.discoveredPartitions.addAll(partitionChanges.getNewPartitions());
        return new PartitionSplitChange((List) partitionChanges.getNewPartitions().stream().map(abstractPartition -> {
            return new PulsarPartitionSplit(abstractPartition, this.startOffsetInitializer, this.stopCondition);
        }).collect(Collectors.toList()), partitionChanges.getRemovedPartitions());
    }

    private void handlePartitionSplitChanges(PartitionSplitChange partitionSplitChange, Throwable th) {
        if (th != null) {
            throw new FlinkRuntimeException("Failed to handle partition splits change due to ", th);
        }
        if (this.partitionDiscoveryIntervalMs < 0) {
            this.noMoreNewPartitionSplits = true;
        }
        addPartitionSplitChangeToPendingAssignments(partitionSplitChange.newPartitionSplits);
        assignPendingPartitionSplits();
    }

    private void addPartitionSplitChangeToPendingAssignments(Collection<PulsarPartitionSplit> collection) {
        int currentParallelism = this.context.currentParallelism();
        for (PulsarPartitionSplit pulsarPartitionSplit : collection) {
            this.pendingPartitionSplitAssignment.computeIfAbsent(Integer.valueOf(this.splitSchedulingStrategy.getIndexOfReader(currentParallelism, pulsarPartitionSplit)), num -> {
                return new ArrayList();
            }).add(pulsarPartitionSplit);
        }
        LOG.debug("Assigned {} to {} readers.", collection, Integer.valueOf(currentParallelism));
    }

    private void assignPendingPartitionSplits() {
        HashMap hashMap = new HashMap();
        this.pendingPartitionSplitAssignment.forEach((num, list) -> {
            if (list.isEmpty() || !this.context.registeredReaders().containsKey(num)) {
                return;
            }
            ((List) hashMap.computeIfAbsent(num, num -> {
                return new ArrayList();
            })).addAll(list);
        });
        if (hashMap.isEmpty()) {
            return;
        }
        this.context.assignSplits(new SplitsAssignment(hashMap));
        hashMap.forEach((num2, list2) -> {
            this.readerIdToSplitAssignments.computeIfAbsent(num2, num2 -> {
                return new ArrayList();
            }).addAll(list2);
            this.pendingPartitionSplitAssignment.remove(num2);
            if (this.noMoreNewPartitionSplits) {
                this.context.signalNoMoreSplits(num2.intValue());
            }
        });
    }
}
