package io.kestra.jdbc.runner;

import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.server.AbstractServiceLivenessCoordinator;
import io.kestra.core.server.ServerConfig;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceInstance;
import io.kestra.core.server.WorkerTaskRestartStrategy;
import io.kestra.jdbc.repository.AbstractJdbcServiceInstanceRepository;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@JdbcRunnerEnabled
@Requires(property = "kestra.server-type", pattern = "(EXECUTOR|STANDALONE)")
/* loaded from: input_file:io/kestra/jdbc/runner/JdbcServiceLivenessCoordinator.class */
public final class JdbcServiceLivenessCoordinator extends AbstractServiceLivenessCoordinator {
    private static final Logger log = LoggerFactory.getLogger(JdbcServiceLivenessCoordinator.class);
    private final AtomicReference<JdbcExecutor> executor;
    private final AbstractJdbcServiceInstanceRepository serviceInstanceRepository;

    @Inject
    public JdbcServiceLivenessCoordinator(AbstractJdbcServiceInstanceRepository abstractJdbcServiceInstanceRepository, ServerConfig serverConfig) {
        super(abstractJdbcServiceInstanceRepository, serverConfig);
        this.executor = new AtomicReference<>();
        this.serviceInstanceRepository = abstractJdbcServiceInstanceRepository;
    }

    protected void onSchedule(Instant instant) {
        if (this.executor.get() == null) {
            return;
        }
        this.serviceInstanceRepository.transaction(configuration -> {
            List filterAllNonRespondingServices = filterAllNonRespondingServices(this.serviceInstanceRepository.findAllInstancesInStates(configuration, Service.ServiceState.allRunningStates(), true), instant);
            filterAllNonRespondingServices.forEach(serviceInstance -> {
                this.serviceInstanceRepository.mayTransitServiceTo(configuration, serviceInstance, Service.ServiceState.DISCONNECTED, DEFAULT_REASON_FOR_DISCONNECTED);
            });
            List<String> list = filterAllNonRespondingServices.stream().filter(serviceInstance2 -> {
                return serviceInstance2.is(Service.ServiceType.WORKER);
            }).filter(serviceInstance3 -> {
                return serviceInstance3.config().workerTaskRestartStrategy().equals(WorkerTaskRestartStrategy.IMMEDIATELY);
            }).map((v0) -> {
                return v0.id();
            }).toList();
            if (list.isEmpty()) {
                return;
            }
            log.info("Trigger task restart for non-responding workers after timeout: {}.", list);
            this.executor.get().reEmitWorkerJobsForWorkers(configuration, list);
        });
        this.serviceInstanceRepository.transaction(configuration2 -> {
            List<ServiceInstance> list = this.serviceInstanceRepository.findAllNonRunningInstances(configuration2, true).stream().filter(serviceInstance -> {
                return serviceInstance.is(Service.ServiceType.WORKER);
            }).toList();
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(list.stream().filter(serviceInstance2 -> {
                return serviceInstance2.state().isDisconnectedOrTerminating();
            }).filter(serviceInstance3 -> {
                return serviceInstance3.isTerminationGracePeriodElapsed(instant);
            }).peek(serviceInstance4 -> {
                mayLogNonRespondingAfterTerminationGracePeriod(serviceInstance4, instant);
            }).toList());
            arrayList.addAll(list.stream().filter(serviceInstance5 -> {
                return serviceInstance5.is(Service.ServiceState.TERMINATED_FORCED);
            }).toList());
            if (!arrayList.isEmpty()) {
                List<String> list2 = arrayList.stream().filter(serviceInstance6 -> {
                    return serviceInstance6.config().workerTaskRestartStrategy().isRestartable();
                }).map((v0) -> {
                    return v0.id();
                }).toList();
                if (!list2.isEmpty()) {
                    log.info("Trigger task restart for non-responding workers after termination grace period: {}.", list2);
                    this.executor.get().reEmitWorkerJobsForWorkers(configuration2, list2);
                }
            }
            Stream.concat(list.stream().filter(serviceInstance7 -> {
                return serviceInstance7.is(Service.ServiceState.TERMINATED_GRACEFULLY);
            }), arrayList.stream()).forEach(serviceInstance8 -> {
                this.serviceInstanceRepository.mayTransitServiceTo(configuration2, serviceInstance8, Service.ServiceState.NOT_RUNNING, DEFAULT_REASON_FOR_NOT_RUNNING);
            });
        });
        this.serviceInstanceRepository.findAllInstancesInStates(Set.of(Service.ServiceState.DISCONNECTED, Service.ServiceState.TERMINATING, Service.ServiceState.TERMINATED_GRACEFULLY, Service.ServiceState.TERMINATED_FORCED)).stream().filter(serviceInstance -> {
            return !serviceInstance.is(Service.ServiceType.WORKER);
        }).filter(serviceInstance2 -> {
            return serviceInstance2.isTerminationGracePeriodElapsed(instant);
        }).peek(serviceInstance3 -> {
            mayLogNonRespondingAfterTerminationGracePeriod(serviceInstance3, instant);
        }).forEach(serviceInstance4 -> {
            safelyTransitionServiceTo(serviceInstance4, Service.ServiceState.NOT_RUNNING, DEFAULT_REASON_FOR_NOT_RUNNING);
        });
        this.serviceInstanceRepository.findAllInstancesInState(Service.ServiceState.NOT_RUNNING).forEach(serviceInstance5 -> {
            safelyTransitionServiceTo(serviceInstance5, Service.ServiceState.EMPTY, null);
        });
        mayDetectAndLogNewConnectedServices();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void mayLogNonRespondingAfterTerminationGracePeriod(ServiceInstance serviceInstance, Instant instant) {
        if (serviceInstance.state().isDisconnectedOrTerminating()) {
            log.warn("Detected non-responding service [id={}, type={}, hostname={}] after termination grace period ({}ms).", new Object[]{serviceInstance.id(), serviceInstance.type(), serviceInstance.server().hostname(), Long.valueOf(instant.toEpochMilli() - serviceInstance.updatedAt().toEpochMilli())});
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void setExecutor(JdbcExecutor jdbcExecutor) {
        this.executor.set(jdbcExecutor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void setServerInstance(String str) {
        this.serverId = str;
    }
}
