package io.zeebe.broker.incident;

import io.zeebe.broker.incident.data.IncidentRecord;
import io.zeebe.broker.incident.processor.IncidentStreamProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamEnvironment;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessorTest;
import io.zeebe.broker.topic.StreamProcessorControl;
import io.zeebe.broker.util.StreamProcessorRule;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.impl.record.value.job.JobRecord;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.intent.IncidentIntent;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.test.util.TestUtil;
import io.zeebe.util.buffer.BufferUtil;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.assertj.core.groups.Tuple;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:io/zeebe/broker/incident/IncidentStreamProcessorTest.class */
public class IncidentStreamProcessorTest {

    @Rule
    public StreamProcessorRule rule = new StreamProcessorRule();

    private TypedStreamProcessor buildStreamProcessor(TypedStreamEnvironment typedStreamEnvironment) {
        return new IncidentStreamProcessor().createStreamProcessor(typedStreamEnvironment);
    }

    @Test
    public void shouldNotCreateIncidentIfRetriesAreUpdatedIntermittently() {
        UnpackedObject job = job(0);
        long writeEvent = this.rule.writeEvent(JobIntent.FAILED, job);
        job.setRetries(1);
        this.rule.writeEvent(writeEvent, JobIntent.RETRIES_UPDATED, job);
        this.rule.runStreamProcessor(this::buildStreamProcessor);
        waitForRejectionWithIntent(IncidentIntent.CREATE);
        Assertions.assertThat((List) this.rule.events().onlyIncidentRecords().collect(Collectors.toList())).extracting(typedRecord -> {
            return typedRecord.getMetadata();
        }).extracting(new Function[]{recordMetadata -> {
            return recordMetadata.getRecordType();
        }, recordMetadata2 -> {
            return recordMetadata2.getIntent();
        }}).containsExactly(new Tuple[]{Assertions.tuple(new Object[]{RecordType.COMMAND, IncidentIntent.CREATE}), Assertions.tuple(new Object[]{RecordType.COMMAND_REJECTION, IncidentIntent.CREATE})});
    }

    @Test
    public void shouldNotResolveIncidentIfActivityTerminated() {
        StreamProcessorControl runStreamProcessor = this.rule.runStreamProcessor(this::buildStreamProcessor);
        runStreamProcessor.blockAfterIncidentEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == IncidentIntent.CREATED;
        });
        UnpackedObject workflowInstanceRecord = new WorkflowInstanceRecord();
        workflowInstanceRecord.setWorkflowInstanceKey(1L);
        long writeEvent = this.rule.writeEvent(2L, WorkflowInstanceIntent.ELEMENT_READY, workflowInstanceRecord);
        UnpackedObject incidentRecord = new IncidentRecord();
        incidentRecord.setWorkflowInstanceKey(1L);
        incidentRecord.setActivityInstanceKey(2L);
        incidentRecord.setFailureEventPosition(writeEvent);
        this.rule.writeCommand(IncidentIntent.CREATE, incidentRecord);
        waitForEventWithIntent(IncidentIntent.CREATED);
        this.rule.writeEvent(2L, WorkflowInstanceIntent.PAYLOAD_UPDATED, workflowInstanceRecord);
        this.rule.writeEvent(2L, WorkflowInstanceIntent.ELEMENT_TERMINATED, workflowInstanceRecord);
        runStreamProcessor.unblock();
        waitForEventWithIntent(IncidentIntent.DELETED);
        Assertions.assertThat((List) this.rule.events().onlyIncidentRecords().collect(Collectors.toList())).extracting(typedRecord2 -> {
            return typedRecord2.getMetadata();
        }).extracting(new Function[]{recordMetadata -> {
            return recordMetadata.getRecordType();
        }, recordMetadata2 -> {
            return recordMetadata2.getIntent();
        }}).containsExactly(new Tuple[]{Assertions.tuple(new Object[]{RecordType.COMMAND, IncidentIntent.CREATE}), Assertions.tuple(new Object[]{RecordType.EVENT, IncidentIntent.CREATED}), Assertions.tuple(new Object[]{RecordType.COMMAND, IncidentIntent.RESOLVE}), Assertions.tuple(new Object[]{RecordType.COMMAND, IncidentIntent.DELETE}), Assertions.tuple(new Object[]{RecordType.COMMAND_REJECTION, IncidentIntent.RESOLVE}), Assertions.tuple(new Object[]{RecordType.EVENT, IncidentIntent.DELETED})});
    }

    private JobRecord job(int i) {
        JobRecord jobRecord = new JobRecord();
        jobRecord.setRetries(i);
        jobRecord.setType(BufferUtil.wrapString(TypedStreamProcessorTest.STREAM_NAME));
        return jobRecord;
    }

    private void waitForEventWithIntent(Intent intent) {
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyIncidentRecords().onlyEvents().withIntent(intent).findFirst().isPresent();
        });
    }

    private void waitForRejectionWithIntent(Intent intent) {
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyIncidentRecords().onlyRejections().withIntent(intent).findFirst().isPresent();
        });
    }
}
