package io.imast.work4j.worker.controller;

import io.imast.work4j.channel.SchedulerChannel;
import io.imast.work4j.channel.worker.WorkerExecutionCompleted;
import io.imast.work4j.channel.worker.WorkerExecutionCreated;
import io.imast.work4j.channel.worker.WorkerExecutionPaused;
import io.imast.work4j.channel.worker.WorkerExecutionResumed;
import io.imast.work4j.channel.worker.WorkerListener;
import io.imast.work4j.channel.worker.WorkerMessage;
import io.imast.work4j.model.cluster.ClusterWorker;
import io.imast.work4j.model.execution.ExecutionIndexEntry;
import io.imast.work4j.model.execution.ExecutionStatus;
import io.imast.work4j.worker.WorkerConfiguration;
import io.imast.work4j.worker.WorkerException;
import io.imast.work4j.worker.instance.ExecutionKey;
import io.imast.work4j.worker.instance.QuartzInstance;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/imast/work4j/worker/controller/PollingWorkerListener.class */
public class PollingWorkerListener implements WorkerListener {
    protected final ClusterWorker worker;
    protected final QuartzInstance instance;
    protected final SchedulerChannel channel;
    protected final WorkerConfiguration config;
    protected final ScheduledExecutorService asyncExecutor = Executors.newScheduledThreadPool(1);
    protected final LinkedList<Consumer<WorkerMessage>> consumers = new LinkedList<>();
    protected final int executionChunkSize = 100;
    private static final Logger log = LoggerFactory.getLogger(PollingWorkerListener.class);

    public PollingWorkerListener(ClusterWorker clusterWorker, QuartzInstance quartzInstance, SchedulerChannel schedulerChannel, WorkerConfiguration workerConfiguration) {
        this.worker = clusterWorker;
        this.instance = quartzInstance;
        this.channel = schedulerChannel;
        this.config = workerConfiguration;
    }

    public void start() {
        if (this.config.getPollingRate() == null || this.config.getPollingRate().longValue() == 0) {
            return;
        }
        this.asyncExecutor.scheduleAtFixedRate(() -> {
            sync();
        }, 0L, this.config.getPollingRate().longValue(), TimeUnit.MILLISECONDS);
    }

    public void add(Consumer<WorkerMessage> consumer) {
        this.consumers.add(consumer);
    }

    public void remove(Consumer<WorkerMessage> consumer) {
        this.consumers.removeLastOccurrence(consumer);
    }

    public void stop() {
        this.asyncExecutor.shutdown();
    }

    protected void sync() {
        try {
            syncImpl();
        } catch (WorkerException e) {
            log.error(String.format("PollingListener: Could not sync executions, Error: %s", e.getLocalizedMessage()), e);
        }
    }

    protected void syncImpl() throws WorkerException {
        this.channel.executionIndex(this.worker.getCluster()).subscribe(this::syncIndex, th -> {
            log.error("PollingListener: Could not pull execution index.", th);
        });
    }

    protected void raise(WorkerMessage workerMessage) {
        this.consumers.forEach(consumer -> {
            consumer.accept(workerMessage);
        });
    }

    protected void syncIndex(List<ExecutionIndexEntry> list) throws WorkerException {
        if (list == null) {
            return;
        }
        Map map = (Map) list.stream().collect(Collectors.toMap(executionIndexEntry -> {
            return new ExecutionKey(executionIndexEntry.getId(), executionIndexEntry.getJobId());
        }, executionIndexEntry2 -> {
            return executionIndexEntry2;
        }));
        Set keySet = map.keySet();
        Set<ExecutionKey> pausedExecutions = this.instance.getPausedExecutions();
        Set<ExecutionKey> executions = this.instance.getExecutions();
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        HashSet hashSet3 = new HashSet();
        HashSet hashSet4 = new HashSet();
        keySet.forEach(executionKey -> {
            if (!executions.contains(executionKey)) {
                hashSet.add(executionKey);
                return;
            }
            ExecutionIndexEntry executionIndexEntry3 = (ExecutionIndexEntry) map.get(executionKey);
            if (executionIndexEntry3.getStatus() == ExecutionStatus.COMPLETED) {
                hashSet2.add(executionKey);
                return;
            }
            if (executionIndexEntry3.getStatus() == ExecutionStatus.PAUSED && !pausedExecutions.contains(executionKey)) {
                hashSet3.add(executionKey);
            } else if (executionIndexEntry3.getStatus() == ExecutionStatus.ACTIVE && pausedExecutions.contains(executionKey)) {
                hashSet4.add(executionKey);
            }
        });
        executions.forEach(executionKey2 -> {
            if (map.containsKey(executionKey2)) {
                return;
            }
            hashSet2.add(executionKey2);
        });
        log.info(String.format("PollingWorkerListener: Current: Index(%s), All(%s), Paused(%s)", Integer.valueOf(map.size()), Integer.valueOf(executions.size()), Integer.valueOf(pausedExecutions.size())));
        if (hashSet.size() + hashSet2.size() + hashSet3.size() + hashSet4.size() == 0) {
            return;
        }
        log.info(String.format("PollingWorkerListener: Changes: Add(%s), Delete(%s), Pause(%s), Resume(%s)", Integer.valueOf(hashSet.size()), Integer.valueOf(hashSet2.size()), Integer.valueOf(hashSet3.size()), Integer.valueOf(hashSet4.size())));
        hashSet2.forEach(executionKey3 -> {
            raise(new WorkerExecutionCompleted(executionKey3.getExecutionId(), executionKey3.getJobId()));
        });
        hashSet3.forEach(executionKey4 -> {
            raise(new WorkerExecutionPaused(executionKey4.getExecutionId(), executionKey4.getJobId()));
        });
        hashSet4.forEach(executionKey5 -> {
            raise(new WorkerExecutionResumed(executionKey5.getExecutionId(), executionKey5.getJobId()));
        });
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ArrayList());
        hashSet.forEach(executionKey6 -> {
            if (((ArrayList) arrayList.get(arrayList.size() - 1)).size() >= this.executionChunkSize) {
                arrayList.add(new ArrayList());
            }
            ((ArrayList) arrayList.get(arrayList.size() - 1)).add(executionKey6.getExecutionId());
        });
        arrayList.forEach(arrayList2 -> {
            if (arrayList2 == null || arrayList2.isEmpty()) {
                return;
            }
            this.channel.executions(arrayList2).subscribe(list2 -> {
                if (list2 == null || list2.isEmpty()) {
                    return;
                }
                list2.forEach(jobExecution -> {
                    raise(new WorkerExecutionCreated(jobExecution));
                });
            }, th -> {
                log.error("PollingListener: Could not load portion of executions", th);
            });
        });
    }
}
