package org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher;

import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcherManager.class */
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(SplitFetcherManager.class);
    protected final Map<Integer, SplitFetcher<E, SplitT>> fetchers;
    private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
    private final Supplier<SplitReader<E, SplitT>> splitReaderFactory;
    private final Consumer<Collection<String>> splitFinishedHook;
    private final AtomicInteger fetcherIdGenerator;
    private final AtomicReference<Throwable> uncaughtFetcherException;
    private final Consumer<Throwable> errorHandler;
    private final ExecutorService executors;
    private volatile boolean closed;

    public SplitFetcherManager(BlockingQueue<RecordsWithSplitIds<E>> blockingQueue, Supplier<SplitReader<E, SplitT>> supplier) {
        this(blockingQueue, supplier, collection -> {
        });
    }

    public SplitFetcherManager(BlockingQueue<RecordsWithSplitIds<E>> blockingQueue, Supplier<SplitReader<E, SplitT>> supplier, Consumer<Collection<String>> consumer) {
        this.fetchers = new ConcurrentHashMap();
        this.elementsQueue = blockingQueue;
        this.splitReaderFactory = supplier;
        this.splitFinishedHook = consumer;
        this.fetcherIdGenerator = new AtomicInteger(0);
        this.uncaughtFetcherException = new AtomicReference<>(null);
        this.errorHandler = th -> {
            log.error("Received uncaught exception.", th);
            if (this.uncaughtFetcherException.compareAndSet(null, th)) {
                return;
            }
            this.uncaughtFetcherException.get().addSuppressed(th);
        };
        String name = Thread.currentThread().getName();
        this.executors = Executors.newCachedThreadPool(runnable -> {
            return new Thread(runnable, "Source Data Fetcher for " + name);
        });
    }

    public abstract void addSplits(Collection<SplitT> collection);

    /* JADX INFO: Access modifiers changed from: protected */
    public void startFetcher(SplitFetcher<E, SplitT> splitFetcher) {
        this.executors.submit(splitFetcher);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized SplitFetcher<E, SplitT> createSplitFetcher() {
        if (this.closed) {
            throw new IllegalStateException("The split fetcher manager has closed.");
        }
        SplitReader<E, SplitT> splitReader = this.splitReaderFactory.get();
        int andIncrement = this.fetcherIdGenerator.getAndIncrement();
        SplitFetcher<E, SplitT> splitFetcher = new SplitFetcher<>(andIncrement, this.elementsQueue, splitReader, this.errorHandler, () -> {
            this.fetchers.remove(Integer.valueOf(andIncrement));
        }, this.splitFinishedHook);
        this.fetchers.put(Integer.valueOf(andIncrement), splitFetcher);
        return splitFetcher;
    }

    public synchronized boolean maybeShutdownFinishedFetchers() {
        Iterator<Map.Entry<Integer, SplitFetcher<E, SplitT>>> it = this.fetchers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, SplitFetcher<E, SplitT>> next = it.next();
            SplitFetcher<E, SplitT> value = next.getValue();
            if (value.isIdle()) {
                log.info("Closing splitFetcher {} because it is idle.", next.getKey());
                value.shutdown();
                it.remove();
            }
        }
        return this.fetchers.isEmpty();
    }

    public synchronized void close(long j) throws Exception {
        this.closed = true;
        this.fetchers.values().forEach((v0) -> {
            v0.shutdown();
        });
        this.executors.shutdown();
        if (this.executors.awaitTermination(j, TimeUnit.MILLISECONDS)) {
            return;
        }
        log.warn("Failed to close the source reader in {} ms. There are still {} split fetchers running", Long.valueOf(j), Integer.valueOf(this.fetchers.size()));
    }

    public void checkErrors() {
        if (this.uncaughtFetcherException.get() != null) {
            throw new RuntimeException("One or more fetchers have encountered exception", this.uncaughtFetcherException.get());
        }
    }
}
