package io.imast.work4j.worker.controller;

import io.imast.core.Coll;
import io.imast.core.Lang;
import io.imast.core.Str;
import io.imast.work4j.channel.SchedulerChannel;
import io.imast.work4j.channel.UpdateOperation;
import io.imast.work4j.channel.WorkerSupervior;
import io.imast.work4j.channel.WorkerUpdateMessage;
import io.imast.work4j.model.JobDefinition;
import io.imast.work4j.model.exchange.JobMetadataRequest;
import io.imast.work4j.model.exchange.JobMetadataResponse;
import io.imast.work4j.model.exchange.JobStatusExchangeResponse;
import io.imast.work4j.worker.WorkerConfiguration;
import io.imast.work4j.worker.WorkerException;
import io.imast.work4j.worker.instance.QuartzInstance;
import io.vavr.control.Try;
import java.lang.invoke.SerializedLambda;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/imast/work4j/worker/controller/PollingSupervisor.class */
public class PollingSupervisor implements WorkerSupervior {
    private static final Logger log = LoggerFactory.getLogger(PollingSupervisor.class);
    protected final QuartzInstance instance;
    protected final SchedulerChannel channel;
    protected final WorkerConfiguration config;
    protected final ScheduledExecutorService asyncExecutor = Executors.newScheduledThreadPool(1);
    protected final LinkedList<Consumer<WorkerUpdateMessage>> consumers = new LinkedList<>();

    public PollingSupervisor(QuartzInstance quartzInstance, SchedulerChannel schedulerChannel, WorkerConfiguration workerConfiguration) {
        this.instance = quartzInstance;
        this.channel = schedulerChannel;
        this.config = workerConfiguration;
    }

    public void start() {
        if (this.config.getPollingRate() == null || this.config.getPollingRate().longValue() == 0) {
            return;
        }
        this.asyncExecutor.scheduleAtFixedRate(() -> {
            sync();
        }, 0L, this.config.getPollingRate().longValue(), TimeUnit.MILLISECONDS);
    }

    public void add(Consumer<WorkerUpdateMessage> consumer) {
        this.consumers.add(consumer);
    }

    public void remove(Consumer<WorkerUpdateMessage> consumer) {
        this.consumers.removeLastOccurrence(consumer);
    }

    public void stop() {
        this.asyncExecutor.shutdown();
    }

    protected void sync() {
        try {
            syncImpl();
        } catch (WorkerException e) {
            log.error(String.format("PollingSupervisor: Could not sync jobs, Error: %s", e.getLocalizedMessage()), e);
        }
    }

    protected void syncImpl() throws WorkerException {
        JobMetadataResponse jobMetadataResponse = (JobMetadataResponse) this.channel.metadata(new JobMetadataRequest(this.instance.getCluster())).orElse(null);
        if (jobMetadataResponse == null) {
            throw new WorkerException("PollingSupervisor: Could not pull metadata from controller.");
        }
        HashSet hashSet = new HashSet((Collection) Lang.or(new List[]{jobMetadataResponse.getGroups(), Str.EMPTY_LIST}));
        ((List) Try.of(() -> {
            return this.instance.getGroups();
        }).getOrElse(Str.EMPTY_LIST)).forEach(str -> {
            if (hashSet.contains(str)) {
                return;
            }
            ((Set) Try.of(() -> {
                return this.instance.getJobs(str);
            }).getOrElse(Set.of())).forEach(str -> {
                raise(new WorkerUpdateMessage(UpdateOperation.REMOVE, str, str, (JobDefinition) null));
            });
        });
        Coll.doubleForeach(hashSet, this.instance.getTypes(), (str2, str3) -> {
            try {
                syncGroupImpl(str2, str3);
            } catch (WorkerException e) {
                log.warn("PollingSupervisor: Something went wrong while syncing jobs. " + e.getMessage());
            }
        });
    }

    protected void syncGroupImpl(String str, String str2) throws WorkerException {
        JobStatusExchangeResponse jobStatusExchangeResponse = (JobStatusExchangeResponse) this.channel.statusExchange(this.instance.getStatus(str, str2)).orElse(null);
        if (jobStatusExchangeResponse == null) {
            throw new WorkerException("PollingSupervisor: Did not get proper response from scheduler.");
        }
        log.debug(String.format("PollingSupervisor: Syncing jobs in %s with server. Deleted: %s, Updated: %s, Added: %s", str, Integer.valueOf(jobStatusExchangeResponse.getRemoved().size()), Integer.valueOf(jobStatusExchangeResponse.getUpdated().size()), Integer.valueOf(jobStatusExchangeResponse.getAdded().size())));
        jobStatusExchangeResponse.getRemoved().forEach(str3 -> {
            raise(new WorkerUpdateMessage(UpdateOperation.REMOVE, str3, jobStatusExchangeResponse.getGroup(), (JobDefinition) null));
        });
        jobStatusExchangeResponse.getAdded().values().forEach(jobDefinition -> {
            raise(new WorkerUpdateMessage(UpdateOperation.ADD, jobDefinition.getCode(), jobDefinition.getGroup(), jobDefinition));
        });
        jobStatusExchangeResponse.getUpdated().values().forEach(jobDefinition2 -> {
            raise(new WorkerUpdateMessage(UpdateOperation.UPDATE, jobDefinition2.getCode(), jobDefinition2.getGroup(), jobDefinition2));
        });
    }

    protected void raise(WorkerUpdateMessage workerUpdateMessage) {
        this.consumers.forEach(consumer -> {
            consumer.accept(workerUpdateMessage);
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -111954618:
                if (implMethodName.equals("lambda$syncImpl$23fdf015$1")) {
                    z = false;
                    break;
                }
                break;
            case 687343623:
                if (implMethodName.equals("lambda$syncImpl$9623e698$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("io/vavr/CheckedFunction0") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("io/imast/work4j/worker/controller/PollingSupervisor") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/List;")) {
                    PollingSupervisor pollingSupervisor = (PollingSupervisor) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return this.instance.getGroups();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("io/vavr/CheckedFunction0") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("io/imast/work4j/worker/controller/PollingSupervisor") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/util/Set;")) {
                    PollingSupervisor pollingSupervisor2 = (PollingSupervisor) serializedLambda.getCapturedArg(0);
                    String str = (String) serializedLambda.getCapturedArg(1);
                    return () -> {
                        return this.instance.getJobs(str);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
