/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.jdbc.runner;

import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
import io.kestra.core.server.AbstractServiceLivenessCoordinator;
import io.kestra.core.server.ServerConfig;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceInstance;
import io.kestra.core.server.WorkerTaskRestartStrategy;
import io.kestra.jdbc.repository.AbstractJdbcServiceInstanceRepository;
import io.kestra.jdbc.runner.JdbcExecutor;
import io.kestra.jdbc.runner.JdbcRunnerEnabled;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@JdbcRunnerEnabled
@Requires(property="kestra.server-type", pattern="(EXECUTOR|STANDALONE)")
public final class JdbcServiceLivenessCoordinator
extends AbstractServiceLivenessCoordinator {
    private static final Logger log = LoggerFactory.getLogger(JdbcServiceLivenessCoordinator.class);
    private final AtomicReference<JdbcExecutor> executor = new AtomicReference();
    private final AbstractJdbcServiceInstanceRepository serviceInstanceRepository;

    @Inject
    public JdbcServiceLivenessCoordinator(AbstractJdbcServiceInstanceRepository serviceInstanceRepository, ServerConfig serverConfig) {
        super((ServiceInstanceRepositoryInterface)serviceInstanceRepository, serverConfig);
        this.serviceInstanceRepository = serviceInstanceRepository;
    }

    protected void onSchedule(Instant now) {
        if (this.executor.get() == null) {
            return;
        }
        this.serviceInstanceRepository.transaction(configuration -> {
            List<ServiceInstance> instances = this.serviceInstanceRepository.findAllInstancesInStates(configuration, Service.ServiceState.allRunningStates(), true);
            List nonRespondingServices = this.filterAllNonRespondingServices(instances, now);
            nonRespondingServices.forEach(instance -> this.serviceInstanceRepository.mayTransitServiceTo(configuration, (ServiceInstance)instance, Service.ServiceState.DISCONNECTED, DEFAULT_REASON_FOR_DISCONNECTED));
            List<String> workerIdsHavingTasksToRestart = nonRespondingServices.stream().filter(instance -> instance.is(Service.ServiceType.WORKER)).filter(instance -> instance.config().workerTaskRestartStrategy().equals((Object)WorkerTaskRestartStrategy.IMMEDIATELY)).map(ServiceInstance::id).toList();
            if (!workerIdsHavingTasksToRestart.isEmpty()) {
                log.info("Trigger task restart for non-responding workers after timeout: {}.", workerIdsHavingTasksToRestart);
                this.executor.get().reEmitWorkerJobsForWorkers(configuration, workerIdsHavingTasksToRestart);
            }
        });
        this.serviceInstanceRepository.transaction(configuration -> {
            List<String> ids;
            List<ServiceInstance> nonRunningWorkers = this.serviceInstanceRepository.findAllNonRunningInstances(configuration, true).stream().filter(instance -> instance.is(Service.ServiceType.WORKER)).toList();
            ArrayList<ServiceInstance> uncleanShutdownWorkers = new ArrayList<ServiceInstance>();
            uncleanShutdownWorkers.addAll(nonRunningWorkers.stream().filter(nonRunning -> nonRunning.state().isDisconnectedOrTerminating()).filter(disconnectedOrTerminating -> disconnectedOrTerminating.isTerminationGracePeriodElapsed(now)).peek(instance -> JdbcServiceLivenessCoordinator.mayLogNonRespondingAfterTerminationGracePeriod(instance, now)).toList());
            uncleanShutdownWorkers.addAll(nonRunningWorkers.stream().filter(nonRunning -> nonRunning.is(Service.ServiceState.TERMINATED_FORCED)).toList());
            if (!uncleanShutdownWorkers.isEmpty() && !(ids = uncleanShutdownWorkers.stream().filter(instance -> instance.config().workerTaskRestartStrategy().isRestartable()).map(ServiceInstance::id).toList()).isEmpty()) {
                log.info("Trigger task restart for non-responding workers after termination grace period: {}.", ids);
                this.executor.get().reEmitWorkerJobsForWorkers(configuration, ids);
            }
            Stream<ServiceInstance> cleanShutdownWorkers = nonRunningWorkers.stream().filter(nonRunning -> nonRunning.is(Service.ServiceState.TERMINATED_GRACEFULLY));
            Stream.concat(cleanShutdownWorkers, uncleanShutdownWorkers.stream()).forEach(instance -> this.serviceInstanceRepository.mayTransitServiceTo(configuration, (ServiceInstance)instance, Service.ServiceState.NOT_RUNNING, DEFAULT_REASON_FOR_NOT_RUNNING));
        });
        this.serviceInstanceRepository.findAllInstancesInStates(Set.of(Service.ServiceState.DISCONNECTED, Service.ServiceState.TERMINATING, Service.ServiceState.TERMINATED_GRACEFULLY, Service.ServiceState.TERMINATED_FORCED)).stream().filter(instance -> !instance.is(Service.ServiceType.WORKER)).filter(instance -> instance.isTerminationGracePeriodElapsed(now)).peek(instance -> JdbcServiceLivenessCoordinator.mayLogNonRespondingAfterTerminationGracePeriod(instance, now)).forEach(instance -> this.safelyTransitionServiceTo((ServiceInstance)instance, Service.ServiceState.NOT_RUNNING, DEFAULT_REASON_FOR_NOT_RUNNING));
        this.serviceInstanceRepository.findAllInstancesInState(Service.ServiceState.NOT_RUNNING).forEach(instance -> this.safelyTransitionServiceTo((ServiceInstance)instance, Service.ServiceState.EMPTY, null));
        this.mayDetectAndLogNewConnectedServices();
    }

    private static void mayLogNonRespondingAfterTerminationGracePeriod(ServiceInstance instance, Instant now) {
        if (instance.state().isDisconnectedOrTerminating()) {
            log.warn("Detected non-responding service [id={}, type={}, hostname={}] after termination grace period ({}ms).", new Object[]{instance.id(), instance.type(), instance.server().hostname(), now.toEpochMilli() - instance.updatedAt().toEpochMilli()});
        }
    }

    synchronized void setExecutor(JdbcExecutor executor) {
        this.executor.set(executor);
    }

    @VisibleForTesting
    void setServerInstance(String serverId) {
        this.serverId = serverId;
    }
}

