/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.job;

import io.camunda.zeebe.engine.processing.job.JobTimeoutTrigger;
import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.builder.ServiceTaskBuilder;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobBatchIntent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.value.JobBatchRecordValue;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import io.camunda.zeebe.test.util.Strings;
import io.camunda.zeebe.test.util.record.JobBatchRecordStream;
import io.camunda.zeebe.test.util.record.JobRecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.util.List;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

public final class JobTimeOutTest {
    @ClassRule
    public static final EngineRule ENGINE = EngineRule.singlePartition();
    private static final String PROCESS_ID = "process";
    private static String jobType;
    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();

    @Before
    public void setup() {
        jobType = Strings.newRandomValidBpmnId();
    }

    @Test
    public void shouldTimeOutJob() {
        long jobKey = ENGINE.createJob(jobType, PROCESS_ID).getKey();
        long timeout = 10L;
        ENGINE.jobs().withType(jobType).withTimeout(10L).activate();
        ENGINE.increaseTime(JobTimeoutTrigger.TIME_OUT_POLLING_INTERVAL);
        RecordingExporter.jobRecords((JobIntent)JobIntent.TIME_OUT).withType(jobType).getFirst();
        ENGINE.jobs().withType(jobType).activate();
        List jobEvents = (List)((JobRecordStream)RecordingExporter.jobRecords().withType(jobType).limit(3L)).collect(Collectors.toList());
        Assertions.assertThat((List)jobEvents).extracting(Record::getKey).contains((Object[])new Long[]{jobKey});
        Assertions.assertThat((List)jobEvents).extracting(Record::getIntent).containsExactly((Object[])new Intent[]{JobIntent.CREATED, JobIntent.TIME_OUT, JobIntent.TIMED_OUT});
    }

    @Test
    public void shouldTimeOutAfterReprocessing() {
        long jobKey = ENGINE.createJob(jobType, PROCESS_ID).getKey();
        long timeout = 10L;
        ENGINE.jobs().withType(jobType).withTimeout(10L).activate();
        ENGINE.increaseTime(JobTimeoutTrigger.TIME_OUT_POLLING_INTERVAL);
        ((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.TIME_OUT).withRecordKey(jobKey)).getFirst();
        long jobKey2 = ENGINE.createJob(jobType, PROCESS_ID).getKey();
        ENGINE.jobs().withType(jobType).activate();
        ENGINE.job().withKey(jobKey).complete();
        ENGINE.reprocess();
        ENGINE.jobs().withType(jobType).activate();
        ENGINE.increaseTime(JobTimeoutTrigger.TIME_OUT_POLLING_INTERVAL);
        ((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.TIME_OUT).withRecordKey(jobKey2)).getFirst();
    }

    @Test
    public void shouldExpireMultipleActivatedJobsAtOnce() {
        long instanceKey1 = this.createInstance();
        long instanceKey2 = this.createInstance();
        long jobKey1 = ((Record)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withType(jobType).filter(r -> ((JobRecordValue)r.getValue()).getProcessInstanceKey() == instanceKey1)).getFirst()).getKey();
        long jobKey2 = ((Record)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withType(jobType).filter(r -> ((JobRecordValue)r.getValue()).getProcessInstanceKey() == instanceKey2)).getFirst()).getKey();
        long timeout = 10L;
        ENGINE.jobs().withType(jobType).withTimeout(10L).activate();
        RecordingExporter.jobBatchRecords((JobBatchIntent)JobBatchIntent.ACTIVATED).withType(jobType).getFirst();
        ENGINE.increaseTime(JobTimeoutTrigger.TIME_OUT_POLLING_INTERVAL);
        RecordingExporter.jobRecords((JobIntent)JobIntent.TIMED_OUT).withProcessInstanceKey(instanceKey1).getFirst();
        ENGINE.jobs().withType(jobType).activate();
        List jobActivations = (List)((JobBatchRecordStream)RecordingExporter.jobBatchRecords((JobBatchIntent)JobBatchIntent.ACTIVATED).withType(jobType).limit(2L)).collect(Collectors.toList());
        List jobKeys = jobActivations.stream().flatMap(jobBatchRecordValueRecord -> ((JobBatchRecordValue)jobBatchRecordValueRecord.getValue()).getJobKeys().stream()).collect(Collectors.toList());
        ((ListAssert)Assertions.assertThat(jobKeys).hasSize(4)).containsExactlyInAnyOrder((Object[])new Long[]{jobKey1, jobKey2, jobKey1, jobKey2});
        List expiredEvents = (List)((JobRecordStream)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.TIMED_OUT).filter(r -> {
            long processInstanceKey = ((JobRecordValue)r.getValue()).getProcessInstanceKey();
            return processInstanceKey == instanceKey1 || processInstanceKey == instanceKey2;
        })).limit(2L)).collect(Collectors.toList());
        Assertions.assertThat((List)expiredEvents).extracting(Record::getKey).containsExactlyInAnyOrder((Object[])new Long[]{jobKey1, jobKey2});
    }

    @Test
    public void shouldHaveNoSourceRecordPositionOnTimeOut() {
        long processInstanceKey = this.createInstance();
        ((Record)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withType(jobType).filter(r -> ((JobRecordValue)r.getValue()).getProcessInstanceKey() == processInstanceKey)).getFirst()).getKey();
        long timeout = 10L;
        ENGINE.jobs().withType(jobType).withTimeout(10L).activate();
        RecordingExporter.jobBatchRecords((JobBatchIntent)JobBatchIntent.ACTIVATED).withType(jobType).getFirst();
        ENGINE.increaseTime(JobTimeoutTrigger.TIME_OUT_POLLING_INTERVAL);
        Record timedOutRecord = (Record)RecordingExporter.jobRecords((JobIntent)JobIntent.TIME_OUT).withProcessInstanceKey(processInstanceKey).getFirst();
        Assertions.assertThat((long)timedOutRecord.getSourceRecordPosition()).isLessThan(0L);
    }

    private long createInstance() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent("start").serviceTask("task", b -> ((ServiceTaskBuilder)b.zeebeJobType(jobType)).done()).endEvent("end").done()).deploy();
        return ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
    }
}

