package io.camunda.zeebe.engine.processing.job;

import io.camunda.zeebe.engine.EngineConfiguration;
import io.camunda.zeebe.engine.processing.bpmn.multiinstance.MultiInstanceSubProcessTest;
import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.engine.util.RecordToWrite;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobBatchIntent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import io.camunda.zeebe.test.util.Strings;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/job/JobTimeOutTest.class */
public final class JobTimeOutTest {

    @ClassRule
    public static final EngineRule ENGINE = EngineRule.singlePartition();
    private static final String PROCESS_ID = "process";
    private static String jobType;

    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();

    @Before
    public void setup() {
        jobType = Strings.newRandomValidBpmnId();
    }

    @Test
    public void shouldTimeOutJob() {
        long key = ENGINE.createJob(jobType, "process").getKey();
        ENGINE.jobs().withType(jobType).withTimeout(10L).activate();
        ENGINE.increaseTime(EngineConfiguration.DEFAULT_JOBS_TIMEOUT_POLLING_INTERVAL);
        RecordingExporter.jobRecords(JobIntent.TIMED_OUT).withType(jobType).getFirst();
        ENGINE.jobs().withType(jobType).activate();
        List list = (List) RecordingExporter.jobRecords().withType(jobType).limit(3L).collect(Collectors.toList());
        Assertions.assertThat(list).extracting((v0) -> {
            return v0.getKey();
        }).contains(new Long[]{Long.valueOf(key)});
        Assertions.assertThat(list).extracting((v0) -> {
            return v0.getIntent();
        }).containsExactly(new Intent[]{JobIntent.CREATED, JobIntent.TIME_OUT, JobIntent.TIMED_OUT});
    }

    @Test
    public void shouldTimeOutAfterReprocessing() {
        long key = ENGINE.createJob(jobType, "process").getKey();
        Duration ofSeconds = Duration.ofSeconds(10L);
        ENGINE.jobs().withType(jobType).withTimeout(ofSeconds.toMillis()).activate();
        ENGINE.increaseTime(ofSeconds.plus(EngineConfiguration.DEFAULT_JOBS_TIMEOUT_POLLING_INTERVAL));
        RecordingExporter.jobRecords(JobIntent.TIMED_OUT).withRecordKey(key).getFirst();
        long key2 = ENGINE.createJob(jobType, "process").getKey();
        ENGINE.jobs().withTimeout(ofSeconds.toMillis()).withType(jobType).activate();
        ENGINE.job().withKey(key).complete();
        ENGINE.reprocess();
        ENGINE.increaseTime(ofSeconds.plus(EngineConfiguration.DEFAULT_JOBS_TIMEOUT_POLLING_INTERVAL));
        RecordingExporter.jobRecords(JobIntent.TIMED_OUT).withRecordKey(key2).getFirst();
    }

    @Test
    public void shouldTimeOutAfterResumed() {
        long key = ENGINE.createJob(jobType, "process").getKey();
        ENGINE.jobs().withType(jobType).withTimeout(EngineConfiguration.DEFAULT_JOBS_TIMEOUT_POLLING_INTERVAL.toMillis() * 2).activate();
        ENGINE.pauseProcessing(1);
        ENGINE.increaseTime(EngineConfiguration.DEFAULT_JOBS_TIMEOUT_POLLING_INTERVAL);
        ENGINE.resumeProcessing(1);
        ENGINE.increaseTime(EngineConfiguration.DEFAULT_JOBS_TIMEOUT_POLLING_INTERVAL);
        RecordingExporter.jobRecords(JobIntent.TIMED_OUT).withRecordKey(key).getFirst();
    }

    @Test
    public void shouldActivateAndTimeOutAfterResumed() {
        long key = ENGINE.createJob(jobType, "process").getKey();
        long millis = EngineConfiguration.DEFAULT_JOBS_TIMEOUT_POLLING_INTERVAL.toMillis();
        ENGINE.pauseProcessing(1);
        ENGINE.increaseTime(EngineConfiguration.DEFAULT_JOBS_TIMEOUT_POLLING_INTERVAL);
        ENGINE.resumeProcessing(1);
        ENGINE.jobs().withType(jobType).withTimeout(millis).activate();
        ENGINE.increaseTime(EngineConfiguration.DEFAULT_JOBS_TIMEOUT_POLLING_INTERVAL);
        RecordingExporter.jobRecords(JobIntent.TIMED_OUT).withRecordKey(key).getFirst();
    }

    @Test
    public void shouldExpireMultipleActivatedJobsAtOnce() {
        long createInstance = createInstance();
        long createInstance2 = createInstance();
        long key = ((Record) RecordingExporter.jobRecords(JobIntent.CREATED).withType(jobType).filter(record -> {
            return record.getValue().getProcessInstanceKey() == createInstance;
        }).getFirst()).getKey();
        long key2 = ((Record) RecordingExporter.jobRecords(JobIntent.CREATED).withType(jobType).filter(record2 -> {
            return record2.getValue().getProcessInstanceKey() == createInstance2;
        }).getFirst()).getKey();
        ENGINE.jobs().withType(jobType).withTimeout(10L).activate();
        RecordingExporter.jobBatchRecords(JobBatchIntent.ACTIVATED).withType(jobType).getFirst();
        ENGINE.increaseTime(EngineConfiguration.DEFAULT_JOBS_TIMEOUT_POLLING_INTERVAL);
        RecordingExporter.jobRecords(JobIntent.TIMED_OUT).withProcessInstanceKey(createInstance).getFirst();
        ENGINE.jobs().withType(jobType).activate();
        Assertions.assertThat((List) ((List) RecordingExporter.jobBatchRecords(JobBatchIntent.ACTIVATED).withType(jobType).limit(2L).collect(Collectors.toList())).stream().flatMap(record3 -> {
            return record3.getValue().getJobKeys().stream();
        }).collect(Collectors.toList())).hasSize(4).containsExactlyInAnyOrder(new Long[]{Long.valueOf(key), Long.valueOf(key2), Long.valueOf(key), Long.valueOf(key2)});
        Assertions.assertThat((List) RecordingExporter.jobRecords(JobIntent.TIMED_OUT).filter(record4 -> {
            long processInstanceKey = record4.getValue().getProcessInstanceKey();
            return processInstanceKey == createInstance || processInstanceKey == createInstance2;
        }).limit(2L).collect(Collectors.toList())).extracting((v0) -> {
            return v0.getKey();
        }).containsExactlyInAnyOrder(new Long[]{Long.valueOf(key), Long.valueOf(key2)});
    }

    @Test
    public void shouldHaveNoSourceRecordPositionOnTimeOut() {
        long createInstance = createInstance();
        ((Record) RecordingExporter.jobRecords(JobIntent.CREATED).withType(jobType).filter(record -> {
            return record.getValue().getProcessInstanceKey() == createInstance;
        }).getFirst()).getKey();
        ENGINE.jobs().withType(jobType).withTimeout(10L).activate();
        RecordingExporter.jobBatchRecords(JobBatchIntent.ACTIVATED).withType(jobType).getFirst();
        ENGINE.increaseTime(EngineConfiguration.DEFAULT_JOBS_TIMEOUT_POLLING_INTERVAL);
        Assertions.assertThat(((Record) RecordingExporter.jobRecords(JobIntent.TIME_OUT).withProcessInstanceKey(createInstance).getFirst()).getSourceRecordPosition()).isLessThan(0L);
    }

    @Test
    public void shouldRejectIfJobDoesNotExist() {
        Record<JobRecordValue> createJob = ENGINE.createJob(jobType, "process");
        int decodePartitionId = Protocol.decodePartitionId(createJob.getKey());
        ENGINE.pauseProcessing(decodePartitionId);
        ENGINE.writeRecords(RecordToWrite.command().key(createJob.getKey()).job(JobIntent.COMPLETE, (JobRecordValue) createJob.getValue()), RecordToWrite.command().key(createJob.getKey()).job(JobIntent.TIME_OUT, (JobRecordValue) createJob.getValue()));
        ENGINE.resumeProcessing(decodePartitionId);
        ConditionFactory await = Awaitility.await("until everything processed");
        EngineRule engineRule = ENGINE;
        Objects.requireNonNull(engineRule);
        await.until(engineRule::hasReachedEnd);
        Assertions.assertThat(RecordingExporter.jobRecords().withRecordKey(createJob.getKey()).withIntent(JobIntent.TIME_OUT).withRecordType(RecordType.COMMAND_REJECTION).limit(1L).toList()).isNotEmpty();
    }

    private long createInstance() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess("process").startEvent("start").serviceTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType(jobType).done();
        }).endEvent("end").done()).deploy();
        return ENGINE.processInstance().ofBpmnProcessId("process").create();
    }
}
