/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.job;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.ServiceTaskBuilder;
import io.camunda.zeebe.model.bpmn.builder.StartEventBuilder;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordValueWithVariables;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobBatchIntent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.JobBatchRecordValue;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import io.camunda.zeebe.scheduler.clock.ControlledActorClock;
import io.camunda.zeebe.test.util.Strings;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.test.util.record.JobBatchRecordStream;
import io.camunda.zeebe.test.util.record.JobRecordStream;
import io.camunda.zeebe.test.util.record.ProcessInstanceRecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import io.camunda.zeebe.util.ByteValue;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.MapAssert;
import org.assertj.core.api.ObjectAssert;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

public final class ActivateJobsTest {
    @ClassRule
    public static final EngineRule ENGINE = EngineRule.singlePartition();
    private static final String LONG_CUSTOM_HEADER_VALUE = "foo".repeat(128);
    private static final String PROCESS_ID = "process";
    private static final Function<String, BpmnModelInstance> MODEL_SUPPLIER = type -> Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent("start").serviceTask("task", b -> ((ServiceTaskBuilder)b.zeebeJobType(type)).done()).endEvent("end").done();
    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();
    private String taskType;

    @Before
    public void setup() {
        this.taskType = Strings.newRandomValidBpmnId();
    }

    @Test
    public void shouldRejectInvalidAmount() {
        Record<JobBatchRecordValue> batchRecord = ENGINE.jobs().withType(this.taskType).withMaxJobsToActivate(0).expectRejection().activate();
        io.camunda.zeebe.protocol.record.Assertions.assertThat(batchRecord).hasRejectionType(RejectionType.INVALID_ARGUMENT).hasRejectionReason("Expected to activate job batch with max jobs to activate to be greater than zero, but it was '0'");
    }

    @Test
    public void shouldRejectInvalidTimeout() {
        Record<JobBatchRecordValue> batchRecord = ENGINE.jobs().withType(this.taskType).withTimeout(Duration.ofSeconds(0L).toMillis()).expectRejection().activate();
        io.camunda.zeebe.protocol.record.Assertions.assertThat(batchRecord).hasRejectionType(RejectionType.INVALID_ARGUMENT).hasRejectionReason("Expected to activate job batch with timeout to be greater than zero, but it was '0'");
    }

    @Test
    public void shouldRejectInvalidType() {
        Record<JobBatchRecordValue> batchRecord = ENGINE.jobs().withType("").expectRejection().activate();
        io.camunda.zeebe.protocol.record.Assertions.assertThat(batchRecord).hasRejectionType(RejectionType.INVALID_ARGUMENT).hasRejectionReason("Expected to activate job batch with type to be present, but it was blank");
    }

    @Test
    public void shouldAcceptEmptyWorker() {
        ENGINE.deployment().withXmlResource("process.bpmn", MODEL_SUPPLIER.apply(this.taskType)).deploy();
        Duration timeout = Duration.ofMinutes(12L);
        Record<JobBatchRecordValue> batchRecord = ENGINE.jobs().withType(this.taskType).withTimeout(timeout.toMillis()).withMaxJobsToActivate(1).activate();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((Intent)batchRecord.getIntent()).isEqualTo((Object)JobBatchIntent.ACTIVATED);
    }

    @Test
    public void shouldActivateSingleJob() {
        ENGINE.deployment().withXmlResource("process.bpmn", MODEL_SUPPLIER.apply(this.taskType)).deploy();
        long firstInstanceKey = this.createProcessInstances(3, "{'foo':'bar'}").get(0);
        long expectedJobKey = ((Record)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withType(this.taskType).filter(r -> ((JobRecordValue)r.getValue()).getProcessInstanceKey() == firstInstanceKey)).getFirst()).getKey();
        String worker = "myTestWorker";
        Duration timeout = Duration.ofMinutes(12L);
        Record<JobBatchRecordValue> batchRecord = ENGINE.jobs().withType(this.taskType).byWorker("myTestWorker").withTimeout(timeout.toMillis()).withMaxJobsToActivate(1).activate();
        List jobs = ((JobBatchRecordValue)batchRecord.getValue()).getJobs();
        List jobKeys = ((JobBatchRecordValue)batchRecord.getValue()).getJobKeys();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((Intent)batchRecord.getIntent()).isEqualTo((Object)JobBatchIntent.ACTIVATED);
        Assertions.assertThat((List)jobKeys).hasSize(1);
        Assertions.assertThat((List)jobs).hasSize(1);
        Assertions.assertThat((Long)((Long)jobKeys.get(0))).isEqualTo(expectedJobKey);
        io.camunda.zeebe.protocol.record.Assertions.assertThat((JobRecordValue)((JobRecordValue)jobs.get(0))).hasRetries(3).hasWorker("myTestWorker").hasType(this.taskType);
        Assertions.assertThat((Map)((JobRecordValue)jobs.get(0)).getVariables()).containsExactly(new Map.Entry[]{Assertions.entry((Object)"foo", (Object)"bar")});
        JobBatchRecordValue activatedJobBatch = this.getActivatedJobBatch();
        JobRecordValue jobRecordValue = (JobRecordValue)activatedJobBatch.getJobs().get(0);
        Long jobKey = (Long)activatedJobBatch.getJobKeys().get(0);
        Assertions.assertThat((Long)jobKey).isEqualTo(expectedJobKey);
        io.camunda.zeebe.protocol.record.Assertions.assertThat((JobRecordValue)jobRecordValue).hasRetries(3).hasWorker("myTestWorker");
    }

    @Test
    public void shouldActivateJobBatch() {
        List<Long> expectedJobKeys = this.deployAndCreateJobs(this.taskType, 5).subList(0, 3);
        List<Long> jobKeys = this.activateJobs(3);
        Assertions.assertThat(jobKeys).containsExactlyInAnyOrderElementsOf(expectedJobKeys);
    }

    @Test
    public void shouldActivateJobBatches() {
        List<Long> jobKeys = this.deployAndCreateJobs(this.taskType, 12);
        List<Long> expectedFirstJobKeys = jobKeys.subList(0, 3);
        List<Long> expectedSecondJobKeys = jobKeys.subList(3, 7);
        List<Long> expectedThirdJobKeys = jobKeys.subList(7, 10);
        List<Long> firstJobs = this.activateJobs(3);
        List<Long> secondJobs = this.activateJobs(4);
        List<Long> thirdJobs = this.activateJobs(3);
        Assertions.assertThat(firstJobs).containsOnlyElementsOf(expectedFirstJobKeys);
        Assertions.assertThat(secondJobs).containsOnlyElementsOf(expectedSecondJobKeys);
        Assertions.assertThat(thirdJobs).containsOnlyElementsOf(expectedThirdJobKeys);
    }

    @Test
    public void shouldReturnEmptyBatchIfNoJobsAvailable() {
        List<Long> jobEvents = this.activateJobs(Strings.newRandomValidBpmnId(), 3);
        Assertions.assertThat(jobEvents).isEmpty();
    }

    @Test
    public void shouldCompleteActivatedJobs() {
        int jobAmount = 5;
        List<Long> jobKeys = this.deployAndCreateJobs(this.taskType, 5);
        List<Long> activateJobKeys = this.activateJobs(this.taskType, 5);
        activateJobKeys.forEach(this::completeJob);
        List records = (List)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.COMPLETED).limit(5L)).collect(Collectors.toList());
        Assertions.assertThat((List)records).extracting(Record::getKey).containsOnlyElementsOf(jobKeys);
    }

    @Test
    public void shouldOnlyReturnJobsOfCorrectType() {
        List<Long> jobKeys = this.deployAndCreateJobs(this.taskType, 3);
        this.deployAndCreateJobs("different" + this.taskType, 5);
        jobKeys.addAll(this.deployAndCreateJobs(this.taskType, 4));
        List<Long> jobs = this.activateJobs(this.taskType, 7);
        Assertions.assertThat(jobs).containsExactly((Object[])jobKeys.toArray(new Long[0]));
        JobBatchRecordValue activatedJobBatch = this.getActivatedJobBatch();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((JobBatchRecordValue)activatedJobBatch).hasJobKeys(jobKeys);
        Assertions.assertThat((List)activatedJobBatch.getJobs()).extracting(JobRecordValue::getType).containsOnly((Object[])new String[]{this.taskType});
    }

    @Test
    public void shouldActivateJobsFromProcess() {
        int jobAmount = 10;
        String jobType = this.taskType;
        String jobType2 = Strings.newRandomValidBpmnId();
        String jobType3 = Strings.newRandomValidBpmnId();
        StartEventBuilder builder = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent("start");
        for (String type : Arrays.asList(jobType, jobType2, jobType3)) {
            builder = builder.serviceTask(type, b -> b.zeebeJobType(type));
        }
        ENGINE.deployment().withXmlResource("process.bpmn", builder.done()).deploy();
        List<Long> processInstanceKeys = this.createProcessInstances(10, "{}");
        this.waitForJobs(jobType, 10, processInstanceKeys);
        this.activateJobs(jobType, 10).forEach(this::completeJob);
        this.waitForJobs(jobType2, 10, processInstanceKeys);
        this.activateJobs(jobType2, 10).forEach(this::completeJob);
        this.waitForJobs(jobType3, 10, processInstanceKeys);
        this.activateJobs(jobType3, 10).forEach(this::completeJob);
        Assertions.assertThat((((ProcessInstanceRecordStream)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withBpmnProcessId(PROCESS_ID).filter(r -> processInstanceKeys.contains(r.getKey()))).limit(10L)).count() == (long)processInstanceKeys.size() ? 1 : 0) != 0);
    }

    @Test
    public void shouldActivateJobsWithLongCustomHeaders() {
        BpmnModelInstance modelInstance = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("task", b -> ((ServiceTaskBuilder)b.zeebeJobType(this.taskType)).zeebeTaskHeader("foo", LONG_CUSTOM_HEADER_VALUE)).endEvent().done();
        ENGINE.deployment().withXmlResource("process.bpmn", modelInstance).deploy();
        long processInstanceKey = this.createProcessInstances(1, "{}").get(0);
        RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withProcessInstanceKey(processInstanceKey).getFirst();
        this.activateJob(this.taskType);
        ENGINE.job().ofInstance(processInstanceKey).withType(this.taskType).complete();
        JobRecordValue jobRecordValue = (JobRecordValue)this.getActivatedJobBatch().getJobs().get(0);
        Assertions.assertThat((Map)jobRecordValue.getCustomHeaders()).containsEntry((Object)"foo", (Object)LONG_CUSTOM_HEADER_VALUE);
    }

    @Test
    public void shouldFetchFullJobRecordFromProcess() {
        ControlledActorClock clock = ENGINE.getClock();
        clock.pinCurrentTime();
        String worker = "testWorker";
        Duration timeout = Duration.ofMinutes(4L);
        ENGINE.deployment().withXmlResource("process.bpmn", MODEL_SUPPLIER.apply(this.taskType)).deploy();
        this.createProcessInstances(1, "{'foo':'bar'}");
        Record jobRecord = (Record)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withType(this.taskType).getFirst();
        Record<JobBatchRecordValue> jobActivatedRecord = ENGINE.jobs().withType(this.taskType).byWorker("testWorker").withTimeout(timeout.toMillis()).withMaxJobsToActivate(1).activate();
        JobRecordValue jobActivated = (JobRecordValue)((JobBatchRecordValue)jobActivatedRecord.getValue()).getJobs().get(0);
        Record jobActivate = (Record)((JobBatchRecordStream)RecordingExporter.jobBatchRecords().withType(this.taskType).withIntent((Intent)JobBatchIntent.ACTIVATE)).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((JobRecordValue)jobActivated).hasType(this.taskType).hasWorker("testWorker").hasRetries(3).hasDeadline(jobActivate.getTimestamp() + timeout.toMillis());
        Assertions.assertThat((Map)jobActivated.getVariables()).containsExactly(new Map.Entry[]{Assertions.entry((Object)"foo", (Object)"bar")});
        JobRecordValue jobRecordValue = (JobRecordValue)jobRecord.getValue();
        Assertions.assertThat((long)jobActivated.getProcessInstanceKey()).isEqualTo(jobRecordValue.getProcessInstanceKey());
        io.camunda.zeebe.protocol.record.Assertions.assertThat((JobRecordValue)jobActivated).hasBpmnProcessId(jobRecordValue.getBpmnProcessId()).hasProcessDefinitionVersion(jobRecordValue.getProcessDefinitionVersion()).hasProcessDefinitionKey(jobRecordValue.getProcessDefinitionKey()).hasElementId(jobRecordValue.getElementId()).hasElementInstanceKey(jobRecordValue.getElementInstanceKey());
        Assertions.assertThat((Map)jobActivated.getCustomHeaders()).isEqualTo((Object)jobRecordValue.getCustomHeaders());
    }

    @Test
    public void shouldLimitJobsInBatch() {
        int jobCount = 3;
        int expectedJobsInBatch = 2;
        long maxMessageSize = ByteValue.ofMegabytes((long)4L);
        long headerSize = ByteValue.ofKilobytes((long)2L);
        long maxRecordSize = maxMessageSize - headerSize - 8192L;
        int variablesSize = (int)maxRecordSize / 2;
        String variables = "{'key': '" + "x".repeat(variablesSize) + "'}";
        this.deployAndCreateJobs(this.taskType, 3, variables);
        List<Long> jobKeys = this.activateJobs(this.taskType, 3);
        Assertions.assertThat(jobKeys).hasSize(2);
        List<Long> remainingJobKeys = this.activateJobs(3);
        Assertions.assertThat(remainingJobKeys).hasSize(1);
    }

    @Test
    public void shouldActivateJobUpToMaxMessageSize() {
        long maxMessageSize = ByteValue.ofMegabytes((long)4L);
        long headerSize = ByteValue.ofKilobytes((long)2L);
        long maxRecordSize = maxMessageSize - headerSize - 8192L;
        ENGINE.deployment().withXmlResource("process.bpmn", MODEL_SUPPLIER.apply(this.taskType)).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        int variablesSize = (int)maxRecordSize / 2;
        Awaitility.await((String)"until the job is created").untilAsserted(() -> Assertions.assertThat((Stream)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withType(this.taskType).filter(r -> processInstanceKey == ((JobRecordValue)r.getValue()).getProcessInstanceKey())).limit(1L)).hasSize(1));
        ENGINE.variables().withDocument(Map.of("foo", "x".repeat(variablesSize))).ofScope(processInstanceKey).update();
        ENGINE.variables().withDocument(Map.of("bar", "x".repeat(variablesSize))).ofScope(processInstanceKey).update();
        JobBatchRecordValue jobs = (JobBatchRecordValue)ENGINE.jobs().withType(this.taskType).withMaxJobsToActivate(1).activate().getValue();
        ((MapAssert)((ObjectAssert)((ListAssert)Assertions.assertThat((List)jobs.getJobs()).hasSize(1)).first()).extracting(RecordValueWithVariables::getVariables).asInstanceOf(InstanceOfAssertFactories.map(String.class, Object.class))).isEqualTo(Map.of("foo", "x".repeat(variablesSize), "bar", "x".repeat(variablesSize)));
    }

    @Test
    public void shouldNotActivateJobWithNoRemainingRetries() {
        BpmnModelInstance modelInstance = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("task", task -> task.zeebeJobType(this.taskType)).endEvent().done();
        ENGINE.deployment().withXmlResource(modelInstance).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        long jobKey = ((Record)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withProcessInstanceKey(processInstanceKey).withElementId("task").getFirst()).getKey();
        ENGINE.job().withKey(jobKey).withRetries(0).fail();
        List<Long> jobs = this.activateJobs(this.taskType, 10);
        ((ListAssert)Assertions.assertThat(jobs).describedAs("Failed job without retries should not be activated", new Object[0])).isEmpty();
    }

    private Record<JobRecordValue> completeJob(long jobKey) {
        return ENGINE.job().withKey(jobKey).complete();
    }

    private Long activateJob(String type) {
        return this.activateJobs(type, 1).get(0);
    }

    private List<Long> activateJobs(String type, int amount) {
        return ((JobBatchRecordValue)ENGINE.jobs().withType(type).withMaxJobsToActivate(amount).activate().getValue()).getJobKeys();
    }

    private List<Long> activateJobs(int amount) {
        return this.activateJobs(this.taskType, amount);
    }

    private List<Long> createProcessInstances(int amount, String variables) {
        return IntStream.range(0, amount).boxed().map(i -> ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariables(variables).create()).collect(Collectors.toList());
    }

    private List<Long> deployAndCreateJobs(String type, int amount, String variables) {
        ENGINE.deployment().withXmlResource("process.bpmn", MODEL_SUPPLIER.apply(type)).deploy();
        List<Long> instanceKeys = this.createProcessInstances(amount, variables);
        return ((JobRecordStream)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withType(type).filter(r -> instanceKeys.contains(((JobRecordValue)r.getValue()).getProcessInstanceKey()))).limit((long)amount)).map(Record::getKey).collect(Collectors.toList());
    }

    private List<Long> deployAndCreateJobs(String type, int amount) {
        return this.deployAndCreateJobs(type, amount, "{'foo':'bar'}");
    }

    private void waitForJobs(String jobType, int jobAmount, List<Long> processInstanceKeys) {
        TestUtil.waitUntil(() -> ((JobRecordStream)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).filter(r -> processInstanceKeys.contains(((JobRecordValue)r.getValue()).getProcessInstanceKey()))).withType(jobType).limit((long)jobAmount)).count() == (long)jobAmount);
    }

    private JobBatchRecordValue getActivatedJobBatch() {
        return (JobBatchRecordValue)((Record)RecordingExporter.jobBatchRecords((JobBatchIntent)JobBatchIntent.ACTIVATED).withType(this.taskType).getFirst()).getValue();
    }
}

