package io.datarouter.conveyor;

import io.datarouter.conveyor.ConveyorConfigurationGroup;
import io.datarouter.conveyor.config.DatarouterConveyorSettingRoot;
import io.datarouter.conveyor.config.DatarouterConveyorShouldRunSettings;
import io.datarouter.conveyor.config.DatarouterConveyorThreadCountSettings;
import io.datarouter.inject.InstanceRegistry;
import io.datarouter.scanner.Scanner;
import io.datarouter.storage.setting.cached.CachedSetting;
import io.datarouter.util.concurrent.ExecutorServiceTool;
import io.datarouter.util.concurrent.NamedThreadFactory;
import io.datarouter.util.duration.DatarouterDuration;
import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/conveyor/ConveyorProcessor.class */
public class ConveyorProcessor {
    private static final Logger logger = LoggerFactory.getLogger(ConveyorProcessor.class);
    private static final Duration MAX_WAIT_FOR_SHUTDOWN = Duration.ofSeconds(5);
    private static final Duration SLEEP_TIME_WHEN_DISABLED = Duration.ofSeconds(5);
    private final DatarouterConveyorThreadCountSettings threadCountSettings;
    private final DatarouterConveyorShouldRunSettings shouldRunSettings;
    private final DatarouterConveyorSettingRoot conveyorSetting;
    private final ConveyorConfigurationGroup.ConveyorPackage conveyorPackage;
    private final ConveyorService conveyorService;
    private final ConveyorConfiguration configuration;
    private final ThreadPoolExecutor exec;
    private final CompletionService<?> completionService;
    private final Thread driverThread;
    private final Set<Future<?>> conveyorFutures;

    public ConveyorProcessor(DatarouterConveyorShouldRunSettings datarouterConveyorShouldRunSettings, DatarouterConveyorThreadCountSettings datarouterConveyorThreadCountSettings, DatarouterConveyorSettingRoot datarouterConveyorSettingRoot, ConveyorConfigurationGroup.ConveyorPackage conveyorPackage, ConveyorService conveyorService, ConveyorConfiguration conveyorConfiguration, InstanceRegistry instanceRegistry) {
        this.threadCountSettings = datarouterConveyorThreadCountSettings;
        this.shouldRunSettings = datarouterConveyorShouldRunSettings;
        this.conveyorSetting = datarouterConveyorSettingRoot;
        this.conveyorPackage = conveyorPackage;
        this.conveyorService = conveyorService;
        this.configuration = conveyorConfiguration;
        this.exec = (ThreadPoolExecutor) instanceRegistry.register(new ThreadPoolExecutor(0, Integer.MAX_VALUE, 1L, TimeUnit.MINUTES, (BlockingQueue<Runnable>) new SynchronousQueue(), (ThreadFactory) new NamedThreadFactory("conveyor-" + conveyorPackage.name(), true)));
        this.completionService = new ExecutorCompletionService(this.exec);
        this.driverThread = new Thread(null, this::run, String.valueOf(conveyorPackage.name()) + " ConveyorProcessor worker thread");
        this.driverThread.start();
        this.conveyorFutures = new HashSet();
    }

    private void run() {
        CachedSetting<Integer> settingForConveyorPackage = this.threadCountSettings.getSettingForConveyorPackage(this.conveyorPackage);
        submitTasks(settingForConveyorPackage.get().intValue());
        while (true) {
            try {
            } catch (Throwable th) {
                logger.error("", th);
            }
            if (Thread.interrupted()) {
                logger.warn("conveyor thread shutting down for name=" + this.conveyorPackage.name());
                return;
            }
            if (((Boolean) this.shouldRunSettings.getSettingForConveyorPackage(this.conveyorPackage).get()).booleanValue()) {
                Future<?> poll = this.completionService.poll(((DatarouterDuration) this.conveyorSetting.pollTimeout.get()).toMillis(), TimeUnit.MILLISECONDS);
                if (poll != null) {
                    logger.debug("One task finished, numRunningTasks={}, numAllowedThread={}", new Object[]{Integer.valueOf(this.conveyorFutures.size()), settingForConveyorPackage.get(), settingForConveyorPackage.get()});
                    this.conveyorFutures.remove(poll);
                    sleepABit(((DatarouterDuration) this.conveyorSetting.sleepOnTaskCompletion.get()).toJavaDuration());
                }
                submitMoreTasksOrCancelTasks(settingForConveyorPackage);
            } else {
                sleepABit(SLEEP_TIME_WHEN_DISABLED);
            }
        }
    }

    private void submitTasks(int i) {
        CachedSetting<Boolean> settingForConveyorPackage = this.shouldRunSettings.getSettingForConveyorPackage(this.conveyorPackage);
        if (((Boolean) settingForConveyorPackage.get()).booleanValue()) {
            for (int i2 = 0; i2 < i; i2++) {
                this.conveyorFutures.add(this.completionService.submit(new Conveyor(this.conveyorService, this.configuration, this.conveyorPackage.name(), settingForConveyorPackage), null));
            }
        }
    }

    private void submitMoreTasksOrCancelTasks(Supplier<Integer> supplier) {
        int intValue = supplier.get().intValue();
        if (this.conveyorFutures.size() == intValue) {
            return;
        }
        if (this.conveyorFutures.size() < intValue) {
            logger.debug("Running tasks smaller than allowed threadCounts, {} < {}", Integer.valueOf(this.conveyorFutures.size()), Integer.valueOf(intValue));
            submitTasks(intValue - this.conveyorFutures.size());
        } else {
            logger.debug("name={} remove {} tasks because numRunningTasks={} but only numThreads={} are allowed.", new Object[]{this.conveyorPackage.name(), Integer.valueOf(this.conveyorFutures.size() - intValue), Integer.valueOf(this.conveyorFutures.size()), Integer.valueOf(intValue)});
            Scanner.of(this.conveyorFutures).limit(this.conveyorFutures.size() - intValue).forEach(future -> {
                future.cancel(true);
            });
        }
    }

    public void requestShutdown() {
        if (this.configuration.shouldRunOnShutdown()) {
            submitTasks(1);
            logger.info("running conveyor={} onShutdown", this.conveyorPackage.name());
        }
        this.driverThread.interrupt();
        ExecutorServiceTool.shutdown(this.exec, MAX_WAIT_FOR_SHUTDOWN);
    }

    public static void sleepABit(Duration duration) {
        try {
            Thread.sleep(duration.toMillis());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public ThreadPoolExecutor getExecutorService() {
        return this.exec;
    }

    public boolean shouldConveyorRun() {
        return ((Boolean) this.shouldRunSettings.getSettingForConveyorPackage(this.conveyorPackage).get()).booleanValue();
    }

    public int getMaxAllowedThreadCount() {
        return ((Integer) this.threadCountSettings.getSettingForConveyorPackage(this.conveyorPackage).get()).intValue();
    }
}
