package io.camunda.zeebe.engine.processing.job;

import io.camunda.zeebe.engine.processing.bpmn.multiinstance.MultiInstanceSubProcessTest;
import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.ServiceTaskBuilder;
import io.camunda.zeebe.protocol.record.Assertions;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.JobBatchIntent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.JobBatchRecordValue;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import io.camunda.zeebe.test.util.Strings;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import io.camunda.zeebe.util.ByteValue;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/job/ActivateJobsTest.class */
public final class ActivateJobsTest {
    private static final String PROCESS_ID = "process";

    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();
    private String taskType;

    @ClassRule
    public static final EngineRule ENGINE = EngineRule.singlePartition();
    private static final String LONG_CUSTOM_HEADER_VALUE = "foo".repeat(128);
    private static final Function<String, BpmnModelInstance> MODEL_SUPPLIER = str -> {
        return Bpmn.createExecutableProcess("process").startEvent("start").serviceTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType(str).done();
        }).endEvent("end").done();
    };

    @Before
    public void setup() {
        this.taskType = Strings.newRandomValidBpmnId();
    }

    @Test
    public void shouldRejectInvalidAmount() {
        Assertions.assertThat(ENGINE.jobs().withType(this.taskType).withMaxJobsToActivate(0).expectRejection().activate()).hasRejectionType(RejectionType.INVALID_ARGUMENT).hasRejectionReason("Expected to activate job batch with max jobs to activate to be greater than zero, but it was '0'");
    }

    @Test
    public void shouldRejectInvalidTimeout() {
        Assertions.assertThat(ENGINE.jobs().withType(this.taskType).withTimeout(Duration.ofSeconds(0L).toMillis()).expectRejection().activate()).hasRejectionType(RejectionType.INVALID_ARGUMENT).hasRejectionReason("Expected to activate job batch with timeout to be greater than zero, but it was '0'");
    }

    @Test
    public void shouldRejectInvalidType() {
        Assertions.assertThat(ENGINE.jobs().withType("").expectRejection().activate()).hasRejectionType(RejectionType.INVALID_ARGUMENT).hasRejectionReason("Expected to activate job batch with type to be present, but it was blank");
    }

    @Test
    public void shouldAcceptEmptyWorker() {
        ENGINE.deployment().withXmlResource("process.bpmn", MODEL_SUPPLIER.apply(this.taskType)).deploy();
        Assertions.assertThat(ENGINE.jobs().withType(this.taskType).withTimeout(Duration.ofMinutes(12L).toMillis()).withMaxJobsToActivate(1).activate().getIntent()).isEqualTo(JobBatchIntent.ACTIVATED);
    }

    @Test
    public void shouldActivateSingleJob() {
        ENGINE.deployment().withXmlResource("process.bpmn", MODEL_SUPPLIER.apply(this.taskType)).deploy();
        long longValue = createProcessInstances(3, "{'foo':'bar'}").get(0).longValue();
        long key = ((Record) RecordingExporter.jobRecords(JobIntent.CREATED).withType(this.taskType).filter(record -> {
            return record.getValue().getProcessInstanceKey() == longValue;
        }).getFirst()).getKey();
        Record<JobBatchRecordValue> activate = ENGINE.jobs().withType(this.taskType).byWorker("myTestWorker").withTimeout(Duration.ofMinutes(12L).toMillis()).withMaxJobsToActivate(1).activate();
        List jobs = activate.getValue().getJobs();
        List jobKeys = activate.getValue().getJobKeys();
        Assertions.assertThat(activate.getIntent()).isEqualTo(JobBatchIntent.ACTIVATED);
        org.assertj.core.api.Assertions.assertThat(jobKeys).hasSize(1);
        org.assertj.core.api.Assertions.assertThat(jobs).hasSize(1);
        org.assertj.core.api.Assertions.assertThat((Long) jobKeys.get(0)).isEqualTo(key);
        Assertions.assertThat((JobRecordValue) jobs.get(0)).hasRetries(3).hasWorker("myTestWorker").hasType(this.taskType);
        org.assertj.core.api.Assertions.assertThat(((JobRecordValue) jobs.get(0)).getVariables()).containsExactly(new Map.Entry[]{org.assertj.core.api.Assertions.entry("foo", "bar")});
        JobBatchRecordValue activatedJobBatch = getActivatedJobBatch();
        JobRecordValue jobRecordValue = (JobRecordValue) activatedJobBatch.getJobs().get(0);
        org.assertj.core.api.Assertions.assertThat((Long) activatedJobBatch.getJobKeys().get(0)).isEqualTo(key);
        Assertions.assertThat(jobRecordValue).hasRetries(3).hasWorker("myTestWorker");
    }

    @Test
    public void shouldActivateJobBatch() {
        org.assertj.core.api.Assertions.assertThat(activateJobs(3)).containsExactlyInAnyOrderElementsOf(deployAndCreateJobs(this.taskType, 5).subList(0, 3));
    }

    @Test
    public void shouldActivateJobBatches() {
        List<Long> deployAndCreateJobs = deployAndCreateJobs(this.taskType, 12);
        List<Long> subList = deployAndCreateJobs.subList(0, 3);
        List<Long> subList2 = deployAndCreateJobs.subList(3, 7);
        List<Long> subList3 = deployAndCreateJobs.subList(7, 10);
        List<Long> activateJobs = activateJobs(3);
        List<Long> activateJobs2 = activateJobs(4);
        List<Long> activateJobs3 = activateJobs(3);
        org.assertj.core.api.Assertions.assertThat(activateJobs).containsOnlyElementsOf(subList);
        org.assertj.core.api.Assertions.assertThat(activateJobs2).containsOnlyElementsOf(subList2);
        org.assertj.core.api.Assertions.assertThat(activateJobs3).containsOnlyElementsOf(subList3);
    }

    @Test
    public void shouldReturnEmptyBatchIfNoJobsAvailable() {
        org.assertj.core.api.Assertions.assertThat(activateJobs(Strings.newRandomValidBpmnId(), 3)).isEmpty();
    }

    @Test
    public void shouldCompleteActivatedJobs() {
        List<Long> deployAndCreateJobs = deployAndCreateJobs(this.taskType, 5);
        activateJobs(this.taskType, 5).forEach((v1) -> {
            completeJob(v1);
        });
        org.assertj.core.api.Assertions.assertThat((List) RecordingExporter.jobRecords(JobIntent.COMPLETED).limit(5L).collect(Collectors.toList())).extracting((v0) -> {
            return v0.getKey();
        }).containsOnlyElementsOf(deployAndCreateJobs);
    }

    @Test
    public void shouldOnlyReturnJobsOfCorrectType() {
        List<Long> deployAndCreateJobs = deployAndCreateJobs(this.taskType, 3);
        deployAndCreateJobs("different" + this.taskType, 5);
        deployAndCreateJobs.addAll(deployAndCreateJobs(this.taskType, 4));
        org.assertj.core.api.Assertions.assertThat(activateJobs(this.taskType, 7)).containsExactly((Long[]) deployAndCreateJobs.toArray(new Long[0]));
        JobBatchRecordValue activatedJobBatch = getActivatedJobBatch();
        Assertions.assertThat(activatedJobBatch).hasJobKeys(deployAndCreateJobs);
        org.assertj.core.api.Assertions.assertThat(activatedJobBatch.getJobs()).extracting((v0) -> {
            return v0.getType();
        }).containsOnly(new String[]{this.taskType});
    }

    @Test
    public void shouldActivateJobsFromProcess() {
        String str = this.taskType;
        String newRandomValidBpmnId = Strings.newRandomValidBpmnId();
        String newRandomValidBpmnId2 = Strings.newRandomValidBpmnId();
        ServiceTaskBuilder startEvent = Bpmn.createExecutableProcess("process").startEvent("start");
        for (String str2 : Arrays.asList(str, newRandomValidBpmnId, newRandomValidBpmnId2)) {
            startEvent = startEvent.serviceTask(str2, serviceTaskBuilder -> {
                serviceTaskBuilder.zeebeJobType(str2);
            });
        }
        ENGINE.deployment().withXmlResource("process.bpmn", startEvent.done()).deploy();
        List<Long> createProcessInstances = createProcessInstances(10, "{}");
        waitForJobs(str, 10, createProcessInstances);
        activateJobs(str, 10).forEach((v1) -> {
            completeJob(v1);
        });
        waitForJobs(newRandomValidBpmnId, 10, createProcessInstances);
        activateJobs(newRandomValidBpmnId, 10).forEach((v1) -> {
            completeJob(v1);
        });
        waitForJobs(newRandomValidBpmnId2, 10, createProcessInstances);
        activateJobs(newRandomValidBpmnId2, 10).forEach((v1) -> {
            completeJob(v1);
        });
        org.assertj.core.api.Assertions.assertThat(RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).withBpmnProcessId("process").filter(record -> {
            return createProcessInstances.contains(Long.valueOf(record.getKey()));
        }).limit(10L).count() == ((long) createProcessInstances.size()));
    }

    @Test
    public void shouldActivateJobsWithLongCustomHeaders() {
        ENGINE.deployment().withXmlResource("process.bpmn", Bpmn.createExecutableProcess("process").startEvent().serviceTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType(this.taskType).zeebeTaskHeader("foo", LONG_CUSTOM_HEADER_VALUE);
        }).endEvent().done()).deploy();
        long longValue = createProcessInstances(1, "{}").get(0).longValue();
        RecordingExporter.jobRecords(JobIntent.CREATED).withProcessInstanceKey(longValue).getFirst();
        activateJob(this.taskType);
        ENGINE.job().ofInstance(longValue).withType(this.taskType).complete();
        org.assertj.core.api.Assertions.assertThat(((JobRecordValue) getActivatedJobBatch().getJobs().get(0)).getCustomHeaders()).containsEntry("foo", LONG_CUSTOM_HEADER_VALUE);
    }

    @Test
    public void shouldFetchFullJobRecordFromProcess() {
        ENGINE.getClock().pinCurrentTime();
        Duration ofMinutes = Duration.ofMinutes(4L);
        ENGINE.deployment().withXmlResource("process.bpmn", MODEL_SUPPLIER.apply(this.taskType)).deploy();
        createProcessInstances(1, "{'foo':'bar'}");
        Record record = (Record) RecordingExporter.jobRecords(JobIntent.CREATED).withType(this.taskType).getFirst();
        JobRecordValue jobRecordValue = (JobRecordValue) ENGINE.jobs().withType(this.taskType).byWorker("testWorker").withTimeout(ofMinutes.toMillis()).withMaxJobsToActivate(1).activate().getValue().getJobs().get(0);
        Assertions.assertThat(jobRecordValue).hasType(this.taskType).hasWorker("testWorker").hasRetries(3).hasDeadline(((Record) RecordingExporter.jobBatchRecords().withType(this.taskType).withIntent(JobBatchIntent.ACTIVATE).getFirst()).getTimestamp() + ofMinutes.toMillis());
        org.assertj.core.api.Assertions.assertThat(jobRecordValue.getVariables()).containsExactly(new Map.Entry[]{org.assertj.core.api.Assertions.entry("foo", "bar")});
        JobRecordValue value = record.getValue();
        org.assertj.core.api.Assertions.assertThat(jobRecordValue.getProcessInstanceKey()).isEqualTo(value.getProcessInstanceKey());
        Assertions.assertThat(jobRecordValue).hasBpmnProcessId(value.getBpmnProcessId()).hasProcessDefinitionVersion(value.getProcessDefinitionVersion()).hasProcessDefinitionKey(value.getProcessDefinitionKey()).hasElementId(value.getElementId()).hasElementInstanceKey(value.getElementInstanceKey());
        org.assertj.core.api.Assertions.assertThat(jobRecordValue.getCustomHeaders()).isEqualTo(value.getCustomHeaders());
    }

    @Test
    public void shouldLimitJobsInBatch() {
        deployAndCreateJobs(this.taskType, 3, "{'key': '" + "x".repeat(((int) (ByteValue.ofMegabytes(4L) - ByteValue.ofKilobytes(2L))) / 2) + "'}");
        org.assertj.core.api.Assertions.assertThat(activateJobs(this.taskType, 3)).hasSize(2);
        org.assertj.core.api.Assertions.assertThat(activateJobs(3)).hasSize(1);
    }

    @Test
    public void shouldActivateJobUpToMaxMessageSize() {
        long ofMegabytes = ByteValue.ofMegabytes(4L) - ByteValue.ofKilobytes(2L);
        ENGINE.deployment().withXmlResource("process.bpmn", MODEL_SUPPLIER.apply(this.taskType)).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId("process").create();
        int i = ((int) ofMegabytes) / 2;
        Awaitility.await("until the job is created").untilAsserted(() -> {
            org.assertj.core.api.Assertions.assertThat(RecordingExporter.jobRecords(JobIntent.CREATED).withType(this.taskType).filter(record -> {
                return create == record.getValue().getProcessInstanceKey();
            }).limit(1L)).hasSize(1);
        });
        ENGINE.variables().withDocument(Map.of("foo", "x".repeat(i))).ofScope(create).update();
        ENGINE.variables().withDocument(Map.of("bar", "x".repeat(i))).ofScope(create).update();
        org.assertj.core.api.Assertions.assertThat(ENGINE.jobs().withType(this.taskType).withMaxJobsToActivate(1).activate().getValue().getJobs()).hasSize(1).first().extracting((v0) -> {
            return v0.getVariables();
        }).asInstanceOf(InstanceOfAssertFactories.map(String.class, Object.class)).isEqualTo(Map.of("foo", "x".repeat(i), "bar", "x".repeat(i)));
    }

    @Test
    public void shouldNotActivateJobWithNoRemainingRetries() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess("process").startEvent().serviceTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType(this.taskType);
        }).endEvent().done()).deploy();
        ENGINE.job().withKey(((Record) RecordingExporter.jobRecords(JobIntent.CREATED).withProcessInstanceKey(ENGINE.processInstance().ofBpmnProcessId("process").create()).withElementId(MultiInstanceSubProcessTest.TASK_ELEMENT_ID).getFirst()).getKey()).withRetries(0).fail();
        org.assertj.core.api.Assertions.assertThat(activateJobs(this.taskType, 10)).describedAs("Failed job without retries should not be activated", new Object[0]).isEmpty();
    }

    private Record<JobRecordValue> completeJob(long j) {
        return ENGINE.job().withKey(j).complete();
    }

    private Long activateJob(String str) {
        return activateJobs(str, 1).get(0);
    }

    private List<Long> activateJobs(String str, int i) {
        return ENGINE.jobs().withType(str).withMaxJobsToActivate(i).activate().getValue().getJobKeys();
    }

    private List<Long> activateJobs(int i) {
        return activateJobs(this.taskType, i);
    }

    private List<Long> createProcessInstances(int i, String str) {
        return (List) IntStream.range(0, i).boxed().map(num -> {
            return Long.valueOf(ENGINE.processInstance().ofBpmnProcessId("process").withVariables(str).create());
        }).collect(Collectors.toList());
    }

    private List<Long> deployAndCreateJobs(String str, int i, String str2) {
        ENGINE.deployment().withXmlResource("process.bpmn", MODEL_SUPPLIER.apply(str)).deploy();
        List<Long> createProcessInstances = createProcessInstances(i, str2);
        return (List) RecordingExporter.jobRecords(JobIntent.CREATED).withType(str).filter(record -> {
            return createProcessInstances.contains(Long.valueOf(record.getValue().getProcessInstanceKey()));
        }).limit(i).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
    }

    private List<Long> deployAndCreateJobs(String str, int i) {
        return deployAndCreateJobs(str, i, "{'foo':'bar'}");
    }

    private void waitForJobs(String str, int i, List<Long> list) {
        TestUtil.waitUntil(() -> {
            return RecordingExporter.jobRecords(JobIntent.CREATED).filter(record -> {
                return list.contains(Long.valueOf(record.getValue().getProcessInstanceKey()));
            }).withType(str).limit((long) i).count() == ((long) i);
        });
    }

    private JobBatchRecordValue getActivatedJobBatch() {
        return ((Record) RecordingExporter.jobBatchRecords(JobBatchIntent.ACTIVATED).withType(this.taskType).getFirst()).getValue();
    }
}
