package io.kestra.jdbc.runner;

import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.runners.StandAloneRunner;
import io.kestra.core.runners.Worker;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.runners.WorkerTrigger;
import io.kestra.core.runners.WorkerTriggerResult;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.tasks.test.Sleep;
import io.kestra.core.tasks.test.SleepTrigger;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.jdbc.JdbcTestUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Property;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@Property(name = "kestra.server-type", value = "EXECUTOR")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@MicronautTest(transactional = false, environments = {"test", "liveness"})
/* loaded from: input_file:io/kestra/jdbc/runner/JdbcServiceLivenessCoordinatorTest.class */
public abstract class JdbcServiceLivenessCoordinatorTest {

    @Inject
    private StandAloneRunner runner;

    @Inject
    private LocalFlowRepositoryLoader repositoryLoader;

    @Inject
    private ApplicationContext applicationContext;

    @Inject
    JdbcTestUtils jdbcTestUtils;

    @Inject
    RunContextFactory runContextFactory;

    @Inject
    @Named("workerJobQueue")
    QueueInterface<WorkerJob> workerJobQueue;

    @Inject
    @Named("workerTaskResultQueue")
    QueueInterface<WorkerTaskResult> workerTaskResultQueue;

    @Inject
    @Named("workerTriggerResultQueue")
    QueueInterface<WorkerTriggerResult> workerTriggerResultQueue;

    @Inject
    JdbcServiceLivenessCoordinator jdbcServiceLivenessHandler;

    @Inject
    SkipExecutionService skipExecutionService;

    /* JADX INFO: Access modifiers changed from: package-private */
    @BeforeAll
    public void init() throws IOException, URISyntaxException {
        this.jdbcTestUtils.drop();
        this.jdbcTestUtils.migrate();
        TestsUtils.loads(this.repositoryLoader);
        this.jdbcServiceLivenessHandler.setServerInstance(IdUtils.create());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void shouldReEmitTasksWhenWorkerIsDetectedAsNonResponding() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Worker worker = (Worker) this.applicationContext.createBean(Worker.class, new Object[]{IdUtils.create(), 1, null});
        worker.run();
        this.runner.setSchedulerEnabled(false);
        this.runner.setWorkerEnabled(false);
        this.runner.run();
        AtomicReference atomicReference = new AtomicReference(null);
        this.workerTaskResultQueue.receive(either -> {
            atomicReference.set((WorkerTaskResult) either.getLeft());
            if (((WorkerTaskResult) either.getLeft()).getTaskRun().getState().getCurrent() == State.Type.SUCCESS) {
                countDownLatch2.countDown();
            }
            if (((WorkerTaskResult) either.getLeft()).getTaskRun().getState().getCurrent() == State.Type.RUNNING) {
                countDownLatch.countDown();
            }
        });
        this.workerJobQueue.emit(workerTask(Duration.ofSeconds(10L)));
        countDownLatch.await(5L, TimeUnit.SECONDS);
        worker.shutdown();
        Worker worker2 = (Worker) this.applicationContext.createBean(Worker.class, new Object[]{IdUtils.create(), 1, null});
        worker2.run();
        countDownLatch2.await(30L, TimeUnit.SECONDS);
        worker2.shutdown();
        MatcherAssert.assertThat(((WorkerTaskResult) atomicReference.get()).getTaskRun().getState().getCurrent(), Matchers.is(State.Type.SUCCESS));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void taskResubmitSkipExecution() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Worker worker = (Worker) this.applicationContext.createBean(Worker.class, new Object[]{IdUtils.create(), 8, null});
        worker.run();
        this.runner.setSchedulerEnabled(false);
        this.runner.setWorkerEnabled(false);
        this.runner.run();
        WorkerTask workerTask = workerTask(Duration.ofSeconds(10L));
        this.skipExecutionService.setSkipExecutions(List.of(workerTask.getTaskRun().getExecutionId()));
        AtomicReference atomicReference = new AtomicReference(null);
        Runnable receive = this.workerTaskResultQueue.receive(either -> {
            atomicReference.set((WorkerTaskResult) either.getLeft());
            if (((WorkerTaskResult) either.getLeft()).getTaskRun().getState().getCurrent() == State.Type.SUCCESS) {
                Assertions.fail();
            }
            if (((WorkerTaskResult) either.getLeft()).getTaskRun().getState().getCurrent() == State.Type.RUNNING) {
                countDownLatch.countDown();
            }
        });
        this.workerJobQueue.emit(workerTask);
        countDownLatch.await(2L, TimeUnit.SECONDS);
        worker.shutdown();
        Worker worker2 = (Worker) this.applicationContext.createBean(Worker.class, new Object[]{IdUtils.create(), 1, null});
        worker2.run();
        Thread.sleep(500L);
        receive.run();
        worker2.shutdown();
        MatcherAssert.assertThat(((WorkerTaskResult) atomicReference.get()).getTaskRun().getState().getCurrent(), Matchers.not(State.Type.SUCCESS));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void shouldReEmitTriggerWhenWorkerIsDetectedAsNonResponding() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Worker worker = (Worker) this.applicationContext.createBean(Worker.class, new Object[]{IdUtils.create(), 1, null});
        worker.run();
        this.runner.setSchedulerEnabled(false);
        this.runner.setWorkerEnabled(false);
        this.runner.run();
        AtomicReference atomicReference = new AtomicReference(null);
        this.workerTriggerResultQueue.receive(either -> {
            atomicReference.set((WorkerTriggerResult) either.getLeft());
            countDownLatch.countDown();
        });
        WorkerTrigger workerTrigger = workerTrigger(Duration.ofSeconds(10L));
        this.workerJobQueue.emit(workerTrigger);
        Await.until(() -> {
            return worker.getEvaluateTriggerRunningCount().get(workerTrigger.getTriggerContext().uid()) != null;
        }, Duration.ofMillis(100L), Duration.ofSeconds(5L));
        worker.shutdown();
        Worker worker2 = (Worker) this.applicationContext.createBean(Worker.class, new Object[]{IdUtils.create(), 1, null});
        this.applicationContext.registerSingleton(worker2);
        worker2.run();
        boolean await = countDownLatch.await(10L, TimeUnit.SECONDS);
        worker2.shutdown();
        MatcherAssert.assertThat("Last await result was " + await, ((WorkerTriggerResult) atomicReference.get()).getSuccess(), Matchers.is(true));
    }

    private WorkerTask workerTask(Duration duration) {
        Sleep build = Sleep.builder().type(Sleep.class.getName()).id("unit-test").duration(Long.valueOf(duration.toMillis())).build();
        return WorkerTask.builder().runContext(this.runContextFactory.of(ImmutableMap.of("key", "value"))).task(build).taskRun(TaskRun.of(TestsUtils.mockExecution(flowBuilder(duration), ImmutableMap.of()), ResolvedTask.of(build))).build();
    }

    private WorkerTrigger workerTrigger(Duration duration) {
        SleepTrigger build = SleepTrigger.builder().type(SleepTrigger.class.getName()).id("unit-test").duration(Long.valueOf(duration.toMillis())).build();
        Map.Entry mockTrigger = TestsUtils.mockTrigger(this.runContextFactory, build);
        return WorkerTrigger.builder().trigger(build).triggerContext((TriggerContext) mockTrigger.getValue()).conditionContext((ConditionContext) mockTrigger.getKey()).build();
    }

    private Flow flowBuilder(Duration duration) {
        return Flow.builder().id(IdUtils.create()).namespace("io.kestra.unit-test").tasks(Collections.singletonList(Sleep.builder().type(Sleep.class.getName()).id("unit-test").duration(Long.valueOf(duration.toMillis())).build())).triggers(Collections.singletonList(SleepTrigger.builder().type(SleepTrigger.class.getName()).id("unit-test").duration(Long.valueOf(duration.toMillis())).build())).build();
    }
}
