package io.camunda.zeebe.engine.processing.job;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.engine.util.RecordingJobStreamer;
import io.camunda.zeebe.msgpack.value.StringValue;
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.impl.stream.job.ActivatedJob;
import io.camunda.zeebe.protocol.impl.stream.job.JobActivationProperties;
import io.camunda.zeebe.protocol.impl.stream.job.JobActivationPropertiesImpl;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.IncidentIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobBatchIntent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.value.JobBatchRecordValue;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import io.camunda.zeebe.test.util.Strings;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.agrona.DirectBuffer;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/job/ActivatableJobsPushTest.class */
public class ActivatableJobsPushTest {
    private static final String PROCESS_ID = "process";
    private static final RecordingJobStreamer JOB_STREAMER = new RecordingJobStreamer();

    @ClassRule
    public static final EngineRule ENGINE = EngineRule.singlePartition().withJobStreamer(JOB_STREAMER);
    private RecordingJobStreamer.RecordingJobStream jobStream;
    private JobActivationProperties jobActivationProperties;
    private String jobType;
    private DirectBuffer jobTypeBuffer;
    private DirectBuffer worker;
    private Long timeout;
    private Map<String, Object> variables;

    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();
    private final List<Long> activeProcessInstances = new ArrayList();

    @Before
    public void setUp() {
        this.jobType = Strings.newRandomValidBpmnId();
        this.jobTypeBuffer = BufferUtil.wrapString(this.jobType);
        this.worker = BufferUtil.wrapString("test");
        this.variables = Map.of("a", "valA", "b", "valB", "c", "valC");
        this.timeout = 30000L;
        this.jobActivationProperties = new JobActivationPropertiesImpl().setWorker(this.worker, 0, this.worker.capacity()).setTimeout(this.timeout.longValue()).setFetchVariables(List.of(new StringValue("a"), new StringValue("b"), new StringValue("c")));
        this.jobStream = JOB_STREAMER.addJobStream(this.jobTypeBuffer, this.jobActivationProperties);
    }

    @After
    public void tearDown() {
        Iterator<Long> it = this.activeProcessInstances.iterator();
        while (it.hasNext()) {
            ENGINE.processInstance().withInstanceKey(it.next().longValue()).cancel();
        }
        this.activeProcessInstances.clear();
    }

    @Test
    public void shouldPushWhenJobCreated() {
        long longValue = createJob(this.jobType, "process", this.variables).longValue();
        JobBatchRecordValue value = ((Record) RecordingExporter.jobBatchRecords(JobBatchIntent.ACTIVATED).withType(this.jobType).getFirst()).getValue();
        Assertions.assertThat(value.getJobs()).hasSize(1);
        Assertions.assertThat(value.getJobKeys()).contains(new Long[]{Long.valueOf(longValue)});
        assertEventOrder(JobIntent.CREATED, JobBatchIntent.ACTIVATED);
        assertActivatedJob(Long.valueOf(longValue), 1);
    }

    @Test
    public void shouldPushForMultipleJobsCreated() {
        List<Long> createJobs = createJobs(3);
        RecordingExporter.jobRecords(JobIntent.CREATED).withType(this.jobType).await();
        List asList = RecordingExporter.jobBatchRecords(JobBatchIntent.ACTIVATED).withType(this.jobType).asList();
        Assertions.assertThat(asList).hasSize(3);
        List list = (List) asList.stream().flatMap(record -> {
            return record.getValue().getJobKeys().stream();
        }).collect(Collectors.toList());
        Assertions.assertThat(list).containsAnyElementsOf(createJobs);
        assertEventOrder(JobIntent.CREATED, JobBatchIntent.ACTIVATED);
        this.jobStream.getActivatedJobs().stream().forEach(activatedJob -> {
            JobRecord jobRecord = activatedJob.jobRecord();
            Assertions.assertThat(jobRecord.getWorkerBuffer()).isEqualTo(this.worker);
            Assertions.assertThat(jobRecord.getVariables()).isEqualTo(this.variables);
            Assertions.assertThat(activatedJob.jobKey()).isIn(list);
        });
    }

    @Test
    public void shouldPushWhenJobTimesOut() {
        long longValue = createJob(this.jobType, "process", this.variables).longValue();
        ENGINE.increaseTime(JobTimeoutTrigger.TIME_OUT_POLLING_INTERVAL);
        RecordingExporter.jobRecords(JobIntent.TIMED_OUT).withType(this.jobType).await();
        assertJobActivations(2);
        assertEventOrder(JobIntent.TIME_OUT, JobIntent.TIMED_OUT, JobBatchIntent.ACTIVATED);
        assertActivatedJob(Long.valueOf(longValue), 2);
    }

    @Test
    public void shouldPushAfterJobFailed() {
        long longValue = createJob(this.jobType, "process", this.variables).longValue();
        ENGINE.job().withKey(longValue).withRetries(5).fail();
        RecordingExporter.jobRecords(JobIntent.FAILED).withType(this.jobType).await();
        assertJobActivations(2);
        assertEventOrder(JobIntent.FAIL, JobIntent.FAILED, JobBatchIntent.ACTIVATED);
        assertActivatedJob(Long.valueOf(longValue), 2);
    }

    @Test
    public void shouldPushAfterJobBackoff() {
        long longValue = createJob(this.jobType, "process", this.variables).longValue();
        ENGINE.job().withKey(longValue).withRetries(5).withBackOff(Duration.ofMillis(10L)).fail();
        ENGINE.increaseTime(Duration.ofMillis(JobBackoffChecker.BACKOFF_RESOLUTION));
        RecordingExporter.jobRecords(JobIntent.RECURRED_AFTER_BACKOFF).withType(this.jobType).await();
        assertJobActivations(2);
        assertEventOrder(JobIntent.RECUR_AFTER_BACKOFF, JobIntent.RECURRED_AFTER_BACKOFF, JobBatchIntent.ACTIVATED);
        assertActivatedJob(Long.valueOf(longValue), 2);
    }

    @Test
    public void shouldPushWhenJobIncidentResolves() {
        long longValue = createJob(this.jobType, "process", this.variables).longValue();
        ENGINE.job().withKey(longValue).withRetries(0).withErrorMessage("raise incident").fail();
        ENGINE.incident().ofInstance(((Record) RecordingExporter.incidentRecords(IncidentIntent.CREATED).getFirst()).getValue().getProcessInstanceKey()).resolve();
        RecordingExporter.incidentRecords(IncidentIntent.RESOLVED).withJobKey(longValue).await();
        assertJobActivations(2);
        assertEventOrder(IncidentIntent.RESOLVE, IncidentIntent.RESOLVED, JobBatchIntent.ACTIVATED);
        assertActivatedJob(Long.valueOf(longValue), 2);
    }

    private List<Long> createJobs(int i) {
        return (List) IntStream.range(0, i).mapToObj(i2 -> {
            return createJob(this.jobType, "process", this.variables);
        }).collect(Collectors.toList());
    }

    private Long createJob(String str, String str2, Map<String, Object> map) {
        Record<JobRecordValue> createJob = ENGINE.createJob(str, str2, map);
        this.activeProcessInstances.add(Long.valueOf(createJob.getValue().getProcessInstanceKey()));
        return Long.valueOf(createJob.getKey());
    }

    private void assertEventOrder(Intent... intentArr) {
        Iterator<Long> it = this.activeProcessInstances.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(RecordingExporter.records().betweenProcessInstance(it.next().longValue())).extracting((v0) -> {
                return v0.getIntent();
            }).containsSequence(intentArr);
        }
    }

    private void assertJobActivations(int i) {
        Assertions.assertThat(RecordingExporter.jobBatchRecords(JobBatchIntent.ACTIVATED).withType(this.jobType).asList()).hasSize(i);
    }

    private void assertActivatedJob(Long l, int i) {
        List<ActivatedJob> activatedJobs = this.jobStream.getActivatedJobs();
        Assertions.assertThat(activatedJobs).hasSize(i);
        activatedJobs.stream().forEach(activatedJob -> {
            Assertions.assertThat(activatedJob.jobKey()).isEqualTo(l);
            JobRecord jobRecord = activatedJob.jobRecord();
            Assertions.assertThat(jobRecord.getWorkerBuffer()).isEqualTo(this.worker);
            Assertions.assertThat(jobRecord.getVariables()).isEqualTo(this.variables);
        });
    }
}
