/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.job;

import io.camunda.zeebe.engine.EngineConfiguration;
import io.camunda.zeebe.engine.processing.job.JobBackoffChecker;
import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.engine.util.RecordingJobStreamer;
import io.camunda.zeebe.msgpack.value.StringValue;
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.impl.stream.job.ActivatedJob;
import io.camunda.zeebe.protocol.impl.stream.job.JobActivationProperties;
import io.camunda.zeebe.protocol.impl.stream.job.JobActivationPropertiesImpl;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.IncidentIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobBatchIntent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.value.IncidentRecordValue;
import io.camunda.zeebe.protocol.record.value.JobBatchRecordValue;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import io.camunda.zeebe.test.util.Strings;
import io.camunda.zeebe.test.util.record.RecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.agrona.DirectBuffer;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

public class ActivatableJobsPushTest {
    private static final String PROCESS_ID = "process";
    private static final RecordingJobStreamer JOB_STREAMER = new RecordingJobStreamer();
    @ClassRule
    public static final EngineRule ENGINE = EngineRule.singlePartition().withJobStreamer(JOB_STREAMER);
    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();
    private RecordingJobStreamer.RecordingJobStream jobStream;
    private JobActivationProperties jobActivationProperties;
    private String jobType;
    private DirectBuffer jobTypeBuffer;
    private DirectBuffer worker;
    private Long timeout;
    private Map<String, Object> variables;
    private final List<Long> activeProcessInstances = new ArrayList<Long>();

    @Before
    public void setUp() {
        this.jobType = Strings.newRandomValidBpmnId();
        this.jobTypeBuffer = BufferUtil.wrapString((String)this.jobType);
        this.worker = BufferUtil.wrapString((String)"test");
        this.variables = Map.of("a", "valA", "b", "valB", "c", "valC");
        this.timeout = 30000L;
        this.jobActivationProperties = new JobActivationPropertiesImpl().setWorker(this.worker, 0, this.worker.capacity()).setTimeout(this.timeout.longValue()).setTenantIds(List.of("<default>")).setFetchVariables(List.of(new StringValue("a"), new StringValue("b"), new StringValue("c")));
        this.jobStream = JOB_STREAMER.addJobStream(this.jobTypeBuffer, this.jobActivationProperties);
    }

    @After
    public void tearDown() {
        for (Long processInstanceKey : this.activeProcessInstances) {
            ENGINE.processInstance().withInstanceKey(processInstanceKey).cancel();
        }
        this.activeProcessInstances.clear();
    }

    @Test
    public void shouldPushWhenJobCreated() {
        boolean activationCount = true;
        long jobKey = this.createJob(this.jobType, PROCESS_ID, this.variables);
        Record batchRecord = (Record)RecordingExporter.jobBatchRecords((JobBatchIntent)JobBatchIntent.ACTIVATED).withType(this.jobType).getFirst();
        JobBatchRecordValue batch = (JobBatchRecordValue)batchRecord.getValue();
        List jobs = batch.getJobs();
        Assertions.assertThat((List)jobs).hasSize(1);
        Assertions.assertThat((List)batch.getJobKeys()).contains((Object[])new Long[]{jobKey});
        this.assertEventOrder(new Intent[]{JobIntent.CREATED, JobBatchIntent.ACTIVATED});
        this.assertActivatedJob(jobKey, 1);
    }

    @Test
    public void shouldPushForMultipleJobsCreated() {
        int numberOfJobs = 3;
        List<Long> jobKeys = this.createJobs(3);
        RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withType(this.jobType).await();
        List batches = RecordingExporter.jobBatchRecords((JobBatchIntent)JobBatchIntent.ACTIVATED).withType(this.jobType).asList();
        Assertions.assertThat((List)batches).hasSize(3);
        List batchJobKeys = batches.stream().flatMap(record -> ((JobBatchRecordValue)record.getValue()).getJobKeys().stream()).collect(Collectors.toList());
        Assertions.assertThat(batchJobKeys).containsAnyElementsOf(jobKeys);
        this.assertEventOrder(new Intent[]{JobIntent.CREATED, JobBatchIntent.ACTIVATED});
        this.jobStream.getActivatedJobs().stream().forEach(activatedJob -> {
            JobRecord jobRecord = activatedJob.jobRecord();
            Assertions.assertThat((Comparable)jobRecord.getWorkerBuffer()).isEqualTo((Object)this.worker);
            Assertions.assertThat((Map)jobRecord.getVariables()).isEqualTo(this.variables);
            Assertions.assertThat((long)activatedJob.jobKey()).isIn((Iterable)batchJobKeys);
        });
    }

    @Test
    public void shouldPushWhenJobTimesOut() {
        int activationCount = 2;
        long jobKey = this.createJob(this.jobType, PROCESS_ID, this.variables);
        ENGINE.increaseTime(Duration.ofMillis(this.timeout).plus(EngineConfiguration.DEFAULT_JOBS_TIMEOUT_POLLING_INTERVAL));
        RecordingExporter.jobRecords((JobIntent)JobIntent.TIMED_OUT).withType(this.jobType).await();
        this.assertJobActivations(2);
        this.assertEventOrder(new Intent[]{JobIntent.TIME_OUT, JobIntent.TIMED_OUT, JobBatchIntent.ACTIVATED});
        this.assertActivatedJob(jobKey, 2);
    }

    @Test
    public void shouldPushAfterJobFailed() {
        int activationCount = 2;
        long jobKey = this.createJob(this.jobType, PROCESS_ID, this.variables);
        ENGINE.job().withKey(jobKey).withRetries(5).fail();
        RecordingExporter.jobRecords((JobIntent)JobIntent.FAILED).withType(this.jobType).await();
        this.assertJobActivations(2);
        this.assertEventOrder(new Intent[]{JobIntent.FAIL, JobIntent.FAILED, JobBatchIntent.ACTIVATED});
        this.assertActivatedJob(jobKey, 2);
    }

    @Test
    public void shouldPushAfterJobBackoff() {
        int activationCount = 2;
        long jobKey = this.createJob(this.jobType, PROCESS_ID, this.variables);
        ENGINE.job().withKey(jobKey).withRetries(5).withBackOff(Duration.ofMillis(10L)).fail();
        ENGINE.increaseTime(Duration.ofMillis(JobBackoffChecker.BACKOFF_RESOLUTION));
        RecordingExporter.jobRecords((JobIntent)JobIntent.RECURRED_AFTER_BACKOFF).withType(this.jobType).await();
        this.assertJobActivations(2);
        this.assertEventOrder(new Intent[]{JobIntent.RECUR_AFTER_BACKOFF, JobIntent.RECURRED_AFTER_BACKOFF, JobBatchIntent.ACTIVATED});
        this.assertActivatedJob(jobKey, 2);
    }

    @Test
    public void shouldPushWhenJobIncidentResolves() {
        int activationCount = 2;
        long jobKey = this.createJob(this.jobType, PROCESS_ID, this.variables);
        ENGINE.job().withKey(jobKey).withRetries(0).withErrorMessage("raise incident").fail();
        Record incident = (Record)RecordingExporter.incidentRecords((IncidentIntent)IncidentIntent.CREATED).getFirst();
        ENGINE.incident().ofInstance(((IncidentRecordValue)incident.getValue()).getProcessInstanceKey()).resolve();
        RecordingExporter.incidentRecords((IncidentIntent)IncidentIntent.RESOLVED).withJobKey(jobKey).await();
        this.assertJobActivations(2);
        this.assertEventOrder(new Intent[]{IncidentIntent.RESOLVE, IncidentIntent.RESOLVED, JobBatchIntent.ACTIVATED});
        this.assertActivatedJob(jobKey, 2);
    }

    private List<Long> createJobs(int amount) {
        return IntStream.range(0, amount).mapToObj(i -> this.createJob(this.jobType, PROCESS_ID, this.variables)).collect(Collectors.toList());
    }

    private Long createJob(String jobType, String processId, Map<String, Object> variables) {
        Record<JobRecordValue> jobRecord = ENGINE.createJob(jobType, processId, variables, "<default>");
        this.activeProcessInstances.add(((JobRecordValue)jobRecord.getValue()).getProcessInstanceKey());
        return jobRecord.getKey();
    }

    private void assertEventOrder(Intent ... eventOrder) {
        for (long piKey : this.activeProcessInstances) {
            RecordStream processInstanceRecordStream = RecordingExporter.records().betweenProcessInstance(piKey);
            Assertions.assertThat((Stream)processInstanceRecordStream).extracting(Record::getIntent).containsSequence((Object[])eventOrder);
        }
    }

    private void assertJobActivations(int activationCount) {
        List batchRecord = RecordingExporter.jobBatchRecords((JobBatchIntent)JobBatchIntent.ACTIVATED).withType(this.jobType).asList();
        Assertions.assertThat((List)batchRecord).hasSize(activationCount);
    }

    private void assertActivatedJob(Long jobKey, int activationCount) {
        List<ActivatedJob> activatedJobs = this.jobStream.getActivatedJobs();
        Assertions.assertThat(activatedJobs).hasSize(activationCount);
        activatedJobs.stream().forEach(activatedJob -> {
            Assertions.assertThat((long)activatedJob.jobKey()).isEqualTo((Object)jobKey);
            JobRecord jobRecord = activatedJob.jobRecord();
            Assertions.assertThat((Comparable)jobRecord.getWorkerBuffer()).isEqualTo((Object)this.worker);
            Assertions.assertThat((Map)jobRecord.getVariables()).isEqualTo(this.variables);
            Assertions.assertThat((String)jobRecord.getTenantId()).isEqualTo("<default>");
        });
    }
}

