package org.graylog.scheduler;

import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.github.joschi.jadconfig.util.Duration;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.bson.types.ObjectId;
import org.graylog.scheduler.Job;
import org.graylog.scheduler.JobTriggerUpdates;
import org.graylog.scheduler.capabilities.SchedulerCapabilitiesService;
import org.graylog.scheduler.clock.JobSchedulerClock;
import org.graylog.scheduler.clock.JobSchedulerSystemClock;
import org.graylog.scheduler.eventbus.JobSchedulerEventBus;
import org.graylog.scheduler.schedule.OnceJobSchedule;
import org.graylog.scheduler.worker.JobWorkerPool;
import org.graylog.testing.mongodb.MongoDBExtension;
import org.graylog.testing.mongodb.MongoDBTestService;
import org.graylog2.bindings.providers.MongoJackObjectMapperProvider;
import org.graylog2.cluster.lock.MongoLockService;
import org.graylog2.cluster.lock.RefreshingLockService;
import org.graylog2.database.MongoCollections;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.system.NodeId;
import org.graylog2.plugin.system.SimpleNodeId;
import org.graylog2.shared.bindings.providers.ObjectMapperProvider;
import org.graylog2.system.shutdown.GracefulShutdownService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.Extensions;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@Timeout(60)
@Extensions({@ExtendWith({MongoDBExtension.class}), @ExtendWith({MockitoExtension.class})})
/* loaded from: input_file:org/graylog/scheduler/JobSchedulerServiceIT.class */
class JobSchedulerServiceIT {

    @Mock
    private ServerStatus serverStatus;

    @Mock
    SchedulerCapabilitiesService schedulerCapabilitiesService;

    @Mock
    private GracefulShutdownService gracefulShutdownService;
    private DBJobTriggerService jobTriggerService;
    private DBCustomJobDefinitionService customJobDefinitionService;
    private JobSchedulerService jobSchedulerService;
    private JobSchedulerClock clock;
    private NodeId nodeId = new SimpleNodeId("dummy-node");
    private final Map<String, Job.Factory> jobFactories = new HashMap();

    @JsonIgnoreProperties(ignoreUnknown = true)
    @JsonTypeName(LimitedJobA.TYPE_NAME)
    /* loaded from: input_file:org/graylog/scheduler/JobSchedulerServiceIT$LimitedJobA.class */
    public static class LimitedJobA implements JobDefinitionConfig {
        public static final String TYPE_NAME = "limited-job-a";

        public String type() {
            return TYPE_NAME;
        }
    }

    /* loaded from: input_file:org/graylog/scheduler/JobSchedulerServiceIT$TestSchedulerConfig.class */
    private static class TestSchedulerConfig implements JobSchedulerConfig {
        public static final int WORKER_THREADS = 10;
        public static final int MAX_CONCURRENCY = 3;

        private TestSchedulerConfig() {
        }

        public boolean canExecute() {
            return true;
        }

        public int numberOfWorkerThreads() {
            return 10;
        }

        public Map<String, Integer> concurrencyLimits() {
            return Map.of(LimitedJobA.TYPE_NAME, 3);
        }
    }

    @JsonIgnoreProperties(ignoreUnknown = true)
    @JsonTypeName(UnlimitedJob.TYPE_NAME)
    /* loaded from: input_file:org/graylog/scheduler/JobSchedulerServiceIT$UnlimitedJob.class */
    public static class UnlimitedJob implements JobDefinitionConfig {
        public static final String TYPE_NAME = "unlimited-job";

        public String type() {
            return TYPE_NAME;
        }
    }

    JobSchedulerServiceIT() {
    }

    @BeforeEach
    void setUp(MongoDBTestService mongoDBTestService) throws Exception {
        ObjectMapper objectMapper = new ObjectMapperProvider(getClass().getClassLoader(), Set.of(new NamedType(UnlimitedJob.class, UnlimitedJob.TYPE_NAME), new NamedType(LimitedJobA.class, LimitedJobA.TYPE_NAME), new NamedType(OnceJobSchedule.class, "once"))).get();
        this.clock = new JobSchedulerSystemClock();
        MetricRegistry metricRegistry = new MetricRegistry();
        JobSchedulerEventBus jobSchedulerEventBus = new JobSchedulerEventBus("job-scheduler", metricRegistry);
        TestSchedulerConfig testSchedulerConfig = new TestSchedulerConfig();
        Duration seconds = Duration.seconds(10L);
        this.customJobDefinitionService = new DBCustomJobDefinitionService(new MongoCollections(new MongoJackObjectMapperProvider(objectMapper), mongoDBTestService.mongoConnection()));
        this.jobTriggerService = new DBJobTriggerService(mongoDBTestService.mongoConnection(), new MongoCollections(new MongoJackObjectMapperProvider(objectMapper), mongoDBTestService.mongoConnection()), new MongoJackObjectMapperProvider(objectMapper), this.nodeId, this.clock, this.schedulerCapabilitiesService, seconds);
        DBJobDefinitionService dBJobDefinitionService = new DBJobDefinitionService(mongoDBTestService.mongoConnection(), new MongoJackObjectMapperProvider(objectMapper));
        JobScheduleStrategies jobScheduleStrategies = new JobScheduleStrategies(this.clock);
        JobTriggerUpdates.Factory factory = jobTriggerDto -> {
            return new JobTriggerUpdates(this.clock, jobScheduleStrategies, jobTriggerDto);
        };
        java.time.Duration ofSeconds = java.time.Duration.ofSeconds(10L);
        MongoLockService mongoLockService = new MongoLockService(this.nodeId, mongoDBTestService.mongoConnection(), ofSeconds);
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(30, new ThreadFactoryBuilder().setNameFormat("scheduled-daemon-%d").setDaemon(true).build());
        this.jobSchedulerService = new JobSchedulerService(jobWorkerPool -> {
            return new JobExecutionEngine(this.jobTriggerService, dBJobDefinitionService, jobSchedulerEventBus, jobScheduleStrategies, factory, () -> {
                return new RefreshingLockService(mongoLockService, newScheduledThreadPool, ofSeconds);
            }, this.jobFactories, jobWorkerPool, testSchedulerConfig, metricRegistry, 200L);
        }, (str, i, runnable) -> {
            return new JobWorkerPool(str, i, runnable, this.gracefulShutdownService, metricRegistry);
        }, testSchedulerConfig, this.clock, jobSchedulerEventBus, this.serverStatus, Duration.milliseconds(200L));
    }

    @Test
    void testMaxConcurrency() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(60);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
        Job job = jobExecutionContext -> {
            String jobDefinitionType = jobExecutionContext.trigger().jobDefinitionType();
            Integer num = (Integer) concurrentHashMap.compute(jobDefinitionType, (str, num2) -> {
                return Integer.valueOf(num2 == null ? 1 : num2.intValue() + 1);
            });
            concurrentHashMap2.compute(jobDefinitionType, (str2, num3) -> {
                return Integer.valueOf(Math.max(num.intValue(), num3 == null ? 0 : num3.intValue()));
            });
            Uninterruptibles.sleepUninterruptibly(50L, TimeUnit.MILLISECONDS);
            concurrentHashMap.compute(jobDefinitionType, (str3, num4) -> {
                return Integer.valueOf(num4 == null ? 0 : num4.intValue() - 1);
            });
            countDownLatch.countDown();
            return JobTriggerUpdate.withoutNextTime();
        };
        this.jobFactories.put(UnlimitedJob.TYPE_NAME, jobDefinitionDto -> {
            return job;
        });
        this.jobFactories.put(LimitedJobA.TYPE_NAME, jobDefinitionDto2 -> {
            return job;
        });
        this.jobSchedulerService.startAsync().awaitRunning();
        try {
            createTriggers(30, this.customJobDefinitionService.findOrCreate(jobDefinitionDto(new LimitedJobA())));
            createTriggers(30, this.customJobDefinitionService.findOrCreate(jobDefinitionDto(new UnlimitedJob())));
            countDownLatch.await();
            this.jobSchedulerService.stopAsync().awaitTerminated();
            Assertions.assertThat((Integer) concurrentHashMap2.get(UnlimitedJob.TYPE_NAME)).isEqualTo(10);
            Assertions.assertThat((Integer) concurrentHashMap2.get(LimitedJobA.TYPE_NAME)).isEqualTo(3);
        } catch (Throwable th) {
            this.jobSchedulerService.stopAsync().awaitTerminated();
            throw th;
        }
    }

    @Test
    void testLimitedJobsAreNotBlockingOtherJobs() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(60);
        CountDownLatch countDownLatch2 = new CountDownLatch(30);
        Job job = jobExecutionContext -> {
            countDownLatch2.countDown();
            countDownLatch.countDown();
            return JobTriggerUpdate.withoutNextTime();
        };
        Job job2 = jobExecutionContext2 -> {
            try {
                countDownLatch2.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            countDownLatch.countDown();
            return JobTriggerUpdate.withoutNextTime();
        };
        this.jobFactories.put(UnlimitedJob.TYPE_NAME, jobDefinitionDto -> {
            return job;
        });
        this.jobFactories.put(LimitedJobA.TYPE_NAME, jobDefinitionDto2 -> {
            return job2;
        });
        this.jobSchedulerService.startAsync().awaitRunning();
        try {
            createTriggers(30, this.customJobDefinitionService.findOrCreate(jobDefinitionDto(new LimitedJobA())));
            createTriggers(30, this.customJobDefinitionService.findOrCreate(jobDefinitionDto(new UnlimitedJob())));
            countDownLatch.await();
            this.jobSchedulerService.stopAsync().awaitTerminated();
        } catch (Throwable th) {
            this.jobSchedulerService.stopAsync().awaitTerminated();
            throw th;
        }
    }

    @Test
    void testInterleavedTriggers() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(60);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
        Job job = jobExecutionContext -> {
            String jobDefinitionType = jobExecutionContext.trigger().jobDefinitionType();
            Integer num = (Integer) concurrentHashMap.compute(jobDefinitionType, (str, num2) -> {
                return Integer.valueOf(num2 == null ? 1 : num2.intValue() + 1);
            });
            concurrentHashMap2.compute(jobDefinitionType, (str2, num3) -> {
                return Integer.valueOf(Math.max(num.intValue(), num3 == null ? 0 : num3.intValue()));
            });
            Uninterruptibles.sleepUninterruptibly(20L, TimeUnit.MILLISECONDS);
            concurrentHashMap.compute(jobDefinitionType, (str3, num4) -> {
                return Integer.valueOf(num4 == null ? 0 : num4.intValue() - 1);
            });
            countDownLatch.countDown();
            return JobTriggerUpdate.withoutNextTime();
        };
        this.jobFactories.put(UnlimitedJob.TYPE_NAME, jobDefinitionDto -> {
            return job;
        });
        this.jobFactories.put(LimitedJobA.TYPE_NAME, jobDefinitionDto2 -> {
            return job;
        });
        this.jobSchedulerService.startAsync().awaitRunning();
        for (int i = 0; i < 30; i++) {
            try {
                createTriggers(1, this.customJobDefinitionService.findOrCreate(jobDefinitionDto(new LimitedJobA())));
                createTriggers(1, this.customJobDefinitionService.findOrCreate(jobDefinitionDto(new UnlimitedJob())));
            } catch (Throwable th) {
                this.jobSchedulerService.stopAsync().awaitTerminated();
                throw th;
            }
        }
        countDownLatch.await();
        this.jobSchedulerService.stopAsync().awaitTerminated();
        Assertions.assertThat((Integer) concurrentHashMap2.get(LimitedJobA.TYPE_NAME)).isEqualTo(3);
    }

    private List<JobTriggerDto> createTriggers(int i, JobDefinitionDto jobDefinitionDto) {
        return IntStream.range(0, i).mapToObj(i2 -> {
            return this.jobTriggerService.create(JobTriggerDto.builder().jobDefinitionType(jobDefinitionDto.config().type()).jobDefinitionId(jobDefinitionDto.id()).schedule(OnceJobSchedule.create()).build());
        }).toList();
    }

    private JobDefinitionDto jobDefinitionDto(JobDefinitionConfig jobDefinitionConfig) {
        return JobDefinitionDto.builder().id(new ObjectId().toHexString()).title(JobSchedulerServiceIT.class.getSimpleName() + " job definition").description("").config(jobDefinitionConfig).build();
    }
}
