package io.zeebe.broker.incident;

import io.zeebe.UnstableTest;
import io.zeebe.broker.exporter.util.TestJarExporter;
import io.zeebe.broker.incident.data.ErrorType;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessorTest;
import io.zeebe.broker.system.ConfigurationTest;
import io.zeebe.broker.test.EmbeddedBrokerRule;
import io.zeebe.broker.test.MsgPackConstants;
import io.zeebe.broker.workflow.gateway.ParallelGatewayStreamProcessorTest;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.msgpack.spec.MsgPackHelper;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.intent.IncidentIntent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.test.broker.protocol.clientapi.ClientApiRule;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandRequestBuilder;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandResponse;
import io.zeebe.test.broker.protocol.clientapi.SubscribedRecord;
import io.zeebe.test.broker.protocol.clientapi.TestPartitionClient;
import io.zeebe.test.util.MsgPackUtil;
import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.agrona.DirectBuffer;
import org.assertj.core.api.Assertions;
import org.assertj.core.groups.Tuple;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/zeebe/broker/incident/IncidentTest.class */
public class IncidentTest {
    private static final String PROP_PAYLOAD = "payload";
    public EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(new Consumer[0]);
    public ClientApiRule apiRule;

    @Rule
    public RuleChain ruleChain;
    private TestPartitionClient testClient;
    private static final BpmnModelInstance WORKFLOW_INPUT_MAPPING = Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().serviceTask("failingTask", serviceTaskBuilder -> {
        serviceTaskBuilder.zeebeTaskType(ConfigurationTest.BROKER_BASE).zeebeInput("$.foo", "$.foo");
    }).done();
    private static final BpmnModelInstance WORKFLOW_OUTPUT_MAPPING = Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().serviceTask("failingTask", serviceTaskBuilder -> {
        serviceTaskBuilder.zeebeTaskType(ConfigurationTest.BROKER_BASE).zeebeOutput("$.foo", "$.foo");
    }).done();
    private static final byte[] PAYLOAD;

    public IncidentTest() {
        EmbeddedBrokerRule embeddedBrokerRule = this.brokerRule;
        embeddedBrokerRule.getClass();
        this.apiRule = new ClientApiRule(embeddedBrokerRule::getClientAddress);
        this.ruleChain = RuleChain.outerRule(this.brokerRule).around(this.apiRule);
    }

    @Before
    public void init() throws Exception {
        this.testClient = this.apiRule.partition();
        this.apiRule.waitForPartition(1);
    }

    @Test
    public void shouldCreateIncidentForInputMappingFailure() {
        this.testClient.deploy(WORKFLOW_INPUT_MAPPING);
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState("failingTask", WorkflowInstanceIntent.ELEMENT_READY);
        SubscribedRecord receiveFirstIncidentCommand = this.testClient.receiveFirstIncidentCommand(IncidentIntent.CREATE);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        Assertions.assertThat(receiveFirstIncidentEvent.key()).isGreaterThan(0L);
        Assertions.assertThat(receiveFirstIncidentCommand.sourceRecordPosition()).isEqualTo(receiveElementInState.position());
        Assertions.assertThat(receiveFirstIncidentEvent.sourceRecordPosition()).isEqualTo(receiveFirstIncidentCommand.position());
        Assertions.assertThat(receiveFirstIncidentEvent.value()).containsEntry("errorType", ErrorType.IO_MAPPING_ERROR.name()).containsEntry("errorMessage", "No data found for query $.foo.").containsEntry("failureEventPosition", Long.valueOf(receiveElementInState.position())).containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "failingTask").containsEntry("activityInstanceKey", Long.valueOf(receiveElementInState.key())).containsEntry("jobKey", -1L);
    }

    @Test
    public void shouldCreateIncidentForOutputMappingFailure() {
        this.testClient.deploy(WORKFLOW_OUTPUT_MAPPING);
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        this.testClient.completeJobOfType(ConfigurationTest.BROKER_BASE, MsgPackConstants.MSGPACK_PAYLOAD);
        SubscribedRecord receiveFirstWorkflowInstanceEvent = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETING);
        SubscribedRecord receiveFirstIncidentCommand = this.testClient.receiveFirstIncidentCommand(IncidentIntent.CREATE);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        Assertions.assertThat(receiveFirstIncidentEvent.key()).isGreaterThan(0L);
        Assertions.assertThat(receiveFirstIncidentCommand.sourceRecordPosition()).isEqualTo(receiveFirstWorkflowInstanceEvent.position());
        Assertions.assertThat(receiveFirstIncidentEvent.sourceRecordPosition()).isEqualTo(receiveFirstIncidentCommand.position());
        Assertions.assertThat(receiveFirstIncidentEvent.value()).containsEntry("errorType", ErrorType.IO_MAPPING_ERROR.name()).containsEntry("errorMessage", "No data found for query $.foo.").containsEntry("failureEventPosition", Long.valueOf(receiveFirstWorkflowInstanceEvent.position())).containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "failingTask").containsEntry("activityInstanceKey", Long.valueOf(receiveFirstWorkflowInstanceEvent.key())).containsEntry("jobKey", -1L);
    }

    @Test
    public void shouldResolveIncidentForInputMappingFailure() throws Exception {
        this.testClient.deploy(WORKFLOW_INPUT_MAPPING);
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState("failingTask", WorkflowInstanceIntent.ELEMENT_READY);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        updatePayload(createWorkflowInstance, receiveElementInState.key(), PAYLOAD);
        SubscribedRecord receiveElementInState2 = this.testClient.receiveElementInState("failingTask", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        Assertions.assertThat(receiveElementInState2.value()).containsEntry(PROP_PAYLOAD, PAYLOAD);
        SubscribedRecord receiveFirstIncidentEvent2 = this.testClient.receiveFirstIncidentEvent(IncidentIntent.RESOLVED);
        Assertions.assertThat(receiveFirstIncidentEvent2.key()).isEqualTo(receiveFirstIncidentEvent.key());
        Assertions.assertThat(receiveFirstIncidentEvent2.sourceRecordPosition()).isEqualTo(receiveElementInState2.position());
        Assertions.assertThat(receiveFirstIncidentEvent2.value()).containsEntry("errorType", ErrorType.IO_MAPPING_ERROR.name()).containsEntry("errorMessage", "No data found for query $.foo.").containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "failingTask").containsEntry("activityInstanceKey", Long.valueOf(receiveElementInState2.key())).containsEntry("jobKey", -1L);
    }

    @Test
    public void shouldResolveIncidentForOutputMappingFailure() throws Exception {
        this.testClient.deploy(WORKFLOW_OUTPUT_MAPPING);
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        this.testClient.completeJobOfType(ConfigurationTest.BROKER_BASE, MsgPackConstants.MSGPACK_PAYLOAD);
        SubscribedRecord receiveFirstWorkflowInstanceEvent = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETING);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        updatePayload(createWorkflowInstance, receiveFirstWorkflowInstanceEvent.key(), PAYLOAD);
        SubscribedRecord receiveFirstWorkflowInstanceEvent2 = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETED);
        Assertions.assertThat(receiveFirstWorkflowInstanceEvent2.value()).containsEntry(PROP_PAYLOAD, PAYLOAD);
        SubscribedRecord receiveFirstIncidentEvent2 = this.testClient.receiveFirstIncidentEvent(IncidentIntent.RESOLVED);
        Assertions.assertThat(receiveFirstIncidentEvent2.key()).isEqualTo(receiveFirstIncidentEvent.key());
        Assertions.assertThat(receiveFirstIncidentEvent2.sourceRecordPosition()).isEqualTo(receiveFirstWorkflowInstanceEvent2.position());
        Assertions.assertThat(receiveFirstIncidentEvent2.value()).containsEntry("errorType", ErrorType.IO_MAPPING_ERROR.name()).containsEntry("errorMessage", "No data found for query $.foo.").containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "failingTask").containsEntry("activityInstanceKey", Long.valueOf(receiveFirstWorkflowInstanceEvent2.key())).containsEntry("jobKey", -1L);
    }

    @Test
    public void shouldCreateIncidentForInvalidResultOnInputMapping() throws Throwable {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().serviceTask("failingTask", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType("external").zeebeInput("$.string", "$");
        }).done());
        this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, MsgPackConstants.MSGPACK_PAYLOAD);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        Assertions.assertThat(receiveFirstIncidentEvent.key()).isGreaterThan(0L);
        Assertions.assertThat(receiveFirstIncidentEvent.value()).containsEntry("errorType", ErrorType.IO_MAPPING_ERROR.name()).containsEntry("errorMessage", "Processing failed, since mapping will result in a non map object (json object).");
    }

    @Test
    public void shouldResolveIncidentForInvalidResultOnInputMapping() throws Throwable {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().serviceTask("service", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType("external").zeebeInput("$.string", "$");
        }).done());
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, MsgPackConstants.MSGPACK_PAYLOAD);
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState("service", WorkflowInstanceIntent.ELEMENT_READY);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        updatePayload(createWorkflowInstance, receiveElementInState, "{'string':{'obj':'test'}}");
        SubscribedRecord receiveElementInState2 = this.testClient.receiveElementInState("service", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        MsgPackUtil.assertEquality((byte[]) receiveElementInState2.value().get(PROP_PAYLOAD), "{'obj':'test'}");
        SubscribedRecord receiveFirstIncidentEvent2 = this.testClient.receiveFirstIncidentEvent(IncidentIntent.RESOLVED);
        Assertions.assertThat(receiveFirstIncidentEvent2.key()).isEqualTo(receiveFirstIncidentEvent.key());
        Assertions.assertThat(receiveFirstIncidentEvent2.value()).containsEntry("errorType", ErrorType.IO_MAPPING_ERROR.name()).containsEntry("errorMessage", "Processing failed, since mapping will result in a non map object (json object).").containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "service").containsEntry("activityInstanceKey", Long.valueOf(receiveElementInState2.key())).containsEntry("jobKey", -1L);
    }

    @Test
    public void shouldCreateIncidentForInvalidResultOnOutputMapping() throws Throwable {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().serviceTask("failingTask", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType("external").zeebeInput("$.jsonObject", "$").zeebeOutput("$.testAttr", "$");
        }).done());
        this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, MsgPackConstants.MSGPACK_PAYLOAD);
        this.testClient.completeJobOfType("external", MsgPackUtil.asMsgPack("{'testAttr':'test'}"));
        this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        Assertions.assertThat(receiveFirstIncidentEvent.key()).isGreaterThan(0L);
        Assertions.assertThat(receiveFirstIncidentEvent.value()).containsEntry("errorType", ErrorType.IO_MAPPING_ERROR.name()).containsEntry("errorMessage", "Processing failed, since mapping will result in a non map object (json object).");
    }

    @Test
    public void shouldResolveIncidentForInvalidResultOnOutputMapping() throws Throwable {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().serviceTask("service", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType("external").zeebeInput("$.jsonObject", "$").zeebeOutput("$.testAttr", "$");
        }).done());
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, MsgPackConstants.MSGPACK_PAYLOAD);
        this.testClient.completeJobOfType("external", MsgPackUtil.asMsgPack("{'testAttr':'test'}"));
        this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        SubscribedRecord receiveFirstWorkflowInstanceEvent = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETING);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        updatePayload(createWorkflowInstance, receiveFirstWorkflowInstanceEvent, "{'testAttr':{'obj':'test'}}");
        SubscribedRecord receiveFirstWorkflowInstanceEvent2 = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETED);
        MsgPackUtil.assertEquality((byte[]) receiveFirstWorkflowInstanceEvent2.value().get(PROP_PAYLOAD), "{'obj':'test'}");
        SubscribedRecord receiveFirstIncidentEvent2 = this.testClient.receiveFirstIncidentEvent(IncidentIntent.RESOLVED);
        Assertions.assertThat(receiveFirstIncidentEvent2.key()).isEqualTo(receiveFirstIncidentEvent.key());
        Assertions.assertThat(receiveFirstIncidentEvent2.value()).containsEntry("errorType", ErrorType.IO_MAPPING_ERROR.name()).containsEntry("errorMessage", "Processing failed, since mapping will result in a non map object (json object).").containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "service").containsEntry("activityInstanceKey", Long.valueOf(receiveFirstWorkflowInstanceEvent2.key())).containsEntry("jobKey", -1L);
    }

    @Test
    public void shouldCreateIncidentForInAndOutputMappingAndNoTaskCompletePayload() throws Throwable {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().serviceTask("failingTask", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType("external").zeebeInput("$.jsonObject", "$").zeebeOutput("$.testAttr", "$");
        }).done());
        this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, MsgPackConstants.MSGPACK_PAYLOAD);
        this.testClient.completeJobOfType("external");
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        Assertions.assertThat(receiveFirstIncidentEvent.key()).isGreaterThan(0L);
        Assertions.assertThat(receiveFirstIncidentEvent.value()).containsEntry("errorType", ErrorType.IO_MAPPING_ERROR.name()).containsEntry("errorMessage", "No data found for query $.testAttr.");
    }

    @Test
    public void shouldResolveIncidentForInAndOutputMappingAndNoTaskCompletePayload() throws Throwable {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().serviceTask("service", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType("external").zeebeInput("$.jsonObject", "$").zeebeOutput("$.testAttr", "$");
        }).done());
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, MsgPackConstants.MSGPACK_PAYLOAD);
        this.testClient.completeJobOfType("external");
        SubscribedRecord receiveFirstWorkflowInstanceEvent = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETING);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        updatePayload(createWorkflowInstance, receiveFirstWorkflowInstanceEvent, "{'testAttr':{'obj':'test'}}");
        SubscribedRecord receiveFirstWorkflowInstanceEvent2 = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETED);
        MsgPackUtil.assertEquality((byte[]) receiveFirstWorkflowInstanceEvent2.value().get(PROP_PAYLOAD), "{'obj':'test'}");
        SubscribedRecord receiveFirstIncidentEvent2 = this.testClient.receiveFirstIncidentEvent(IncidentIntent.RESOLVED);
        Assertions.assertThat(receiveFirstIncidentEvent2.key()).isEqualTo(receiveFirstIncidentEvent.key());
        Assertions.assertThat(receiveFirstIncidentEvent2.value()).containsEntry("errorType", ErrorType.IO_MAPPING_ERROR.name()).containsEntry("errorMessage", "No data found for query $.testAttr.").containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "service").containsEntry("activityInstanceKey", Long.valueOf(receiveFirstWorkflowInstanceEvent2.key())).containsEntry("jobKey", -1L);
    }

    @Test
    public void shouldCreateIncidentForOutputMappingAndNoTaskCompletePayload() throws Throwable {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().serviceTask("failingTask", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType("external").zeebeOutput("$.testAttr", "$");
        }).done());
        this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, MsgPackConstants.MSGPACK_PAYLOAD);
        this.testClient.completeJobOfType("external");
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        Assertions.assertThat(receiveFirstIncidentEvent.key()).isGreaterThan(0L);
        Assertions.assertThat(receiveFirstIncidentEvent.value()).containsEntry("errorType", ErrorType.IO_MAPPING_ERROR.name()).containsEntry("errorMessage", "No data found for query $.testAttr.");
    }

    @Test
    public void shouldResolveIncidentForOutputMappingAndNoTaskCompletePayload() throws Throwable {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().serviceTask("service", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType("external").zeebeOutput("$.testAttr", "$");
        }).done());
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, MsgPackConstants.MSGPACK_PAYLOAD);
        this.testClient.completeJobOfType("external");
        SubscribedRecord receiveFirstWorkflowInstanceEvent = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETING);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        updatePayload(createWorkflowInstance, receiveFirstWorkflowInstanceEvent, "{'testAttr':{'obj':'test'}}");
        SubscribedRecord receiveFirstWorkflowInstanceEvent2 = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETED);
        MsgPackUtil.assertEquality((byte[]) receiveFirstWorkflowInstanceEvent2.value().get(PROP_PAYLOAD), "{'obj':'test'}");
        SubscribedRecord receiveFirstIncidentEvent2 = this.testClient.receiveFirstIncidentEvent(IncidentIntent.RESOLVED);
        Assertions.assertThat(receiveFirstIncidentEvent2.key()).isEqualTo(receiveFirstIncidentEvent.key());
        Assertions.assertThat(receiveFirstIncidentEvent2.value()).containsEntry("errorType", ErrorType.IO_MAPPING_ERROR.name()).containsEntry("errorMessage", "No data found for query $.testAttr.").containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "service").containsEntry("activityInstanceKey", Long.valueOf(receiveFirstWorkflowInstanceEvent2.key())).containsEntry("jobKey", -1L);
    }

    @Test
    public void shouldCreateIncidentIfExclusiveGatewayHasNoMatchingCondition() {
        this.testClient.deploy(Bpmn.createExecutableProcess("workflow").startEvent().exclusiveGateway("xor").sequenceFlowId("s1").condition("$.foo < 5").endEvent().moveToLastGateway().sequenceFlowId("s2").condition("$.foo >= 5 && $.foo < 10").endEvent().done());
        this.testClient.createWorkflowInstance("workflow", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, 12));
        SubscribedRecord receiveFirstWorkflowInstanceEvent = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.GATEWAY_ACTIVATED);
        SubscribedRecord receiveFirstIncidentCommand = this.testClient.receiveFirstIncidentCommand(IncidentIntent.CREATE);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        Assertions.assertThat(receiveFirstIncidentCommand.sourceRecordPosition()).isEqualTo(receiveFirstWorkflowInstanceEvent.position());
        Assertions.assertThat(receiveFirstIncidentEvent.key()).isGreaterThan(0L);
        Assertions.assertThat(receiveFirstIncidentEvent.value()).containsEntry("errorType", ErrorType.CONDITION_ERROR.name()).containsEntry("errorMessage", "All conditions evaluated to false and no default flow is set.").containsEntry("activityId", "xor");
    }

    @Test
    public void shouldCreateIncidentIfConditionFailsToEvaluate() {
        this.testClient.deploy(Bpmn.createExecutableProcess("workflow").startEvent().exclusiveGateway("xor").sequenceFlowId("s1").condition("$.foo < 5").endEvent().moveToLastGateway().sequenceFlowId("s2").condition("$.foo >= 5 && $.foo < 10").endEvent().done());
        this.testClient.createWorkflowInstance("workflow", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, TestJarExporter.FOO));
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        Assertions.assertThat(receiveFirstIncidentEvent.key()).isGreaterThan(0L);
        Assertions.assertThat(receiveFirstIncidentEvent.value()).containsEntry("errorType", ErrorType.CONDITION_ERROR.name()).containsEntry("errorMessage", "Cannot compare values of different types: STRING and INTEGER").containsEntry("activityId", "xor");
    }

    @Test
    public void shouldResolveIncidentForFailedCondition() throws Throwable {
        this.testClient.deploy(Bpmn.createExecutableProcess("workflow").startEvent().exclusiveGateway("xor").sequenceFlowId("s1").condition("$.foo < 5").endEvent().moveToLastGateway().sequenceFlowId("s2").condition("$.foo >= 5 && $.foo < 10").endEvent().done());
        long createWorkflowInstance = this.testClient.createWorkflowInstance("workflow", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, TestJarExporter.FOO));
        this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        updatePayload(createWorkflowInstance, this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.GATEWAY_ACTIVATED).key(), MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, 7).byteArray());
        List list = (List) this.testClient.receiveRecords().limit(subscribedRecord -> {
            return subscribedRecord.valueType() == ValueType.INCIDENT && subscribedRecord.intent() == IncidentIntent.RESOLVED;
        }).collect(Collectors.toList());
        List list2 = (List) this.testClient.receiveRecords().limit(subscribedRecord2 -> {
            return subscribedRecord2.valueType() == ValueType.WORKFLOW_INSTANCE && subscribedRecord2.intent() == WorkflowInstanceIntent.ELEMENT_COMPLETED && ((Long) subscribedRecord2.value().get("workflowInstanceKey")).longValue() == subscribedRecord2.key();
        }).collect(Collectors.toList());
        Assertions.assertThat(list).extracting(new Function[]{(v0) -> {
            return v0.recordType();
        }, (v0) -> {
            return v0.valueType();
        }, (v0) -> {
            return v0.intent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{RecordType.COMMAND, ValueType.INCIDENT, IncidentIntent.RESOLVE}), Assertions.tuple(new Object[]{RecordType.EVENT, ValueType.INCIDENT, IncidentIntent.RESOLVED})});
        Assertions.assertThat(list2).extracting(new Function[]{(v0) -> {
            return v0.recordType();
        }, (v0) -> {
            return v0.valueType();
        }, (v0) -> {
            return v0.intent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{RecordType.EVENT, ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.GATEWAY_ACTIVATED}), Assertions.tuple(new Object[]{RecordType.EVENT, ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN}), Assertions.tuple(new Object[]{RecordType.EVENT, ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.END_EVENT_OCCURRED}), Assertions.tuple(new Object[]{RecordType.EVENT, ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldResolveIncidentForFailedConditionAfterUploadingWrongPayload() throws Throwable {
        this.testClient.deploy(Bpmn.createExecutableProcess("workflow").startEvent().exclusiveGateway("xor").sequenceFlowId("s1").condition("$.foo < 5").endEvent().moveToLastGateway().sequenceFlowId("s2").condition("$.foo >= 5 && $.foo < 10").endEvent().done());
        long createWorkflowInstance = this.testClient.createWorkflowInstance("workflow", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, TestJarExporter.FOO));
        this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        SubscribedRecord receiveFirstWorkflowInstanceEvent = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.GATEWAY_ACTIVATED);
        updatePayload(createWorkflowInstance, receiveFirstWorkflowInstanceEvent.key(), MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, 10).byteArray());
        List list = (List) this.testClient.receiveRecords().limit(subscribedRecord -> {
            return subscribedRecord.valueType() == ValueType.INCIDENT && subscribedRecord.intent() == IncidentIntent.RESOLVE_FAILED;
        }).collect(Collectors.toList());
        List list2 = (List) this.testClient.receiveRecords().limit(subscribedRecord2 -> {
            return subscribedRecord2.valueType() == ValueType.WORKFLOW_INSTANCE && subscribedRecord2.intent() == WorkflowInstanceIntent.GATEWAY_ACTIVATED;
        }).collect(Collectors.toList());
        Assertions.assertThat(list).extracting(new Function[]{(v0) -> {
            return v0.recordType();
        }, (v0) -> {
            return v0.valueType();
        }, (v0) -> {
            return v0.intent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{RecordType.COMMAND, ValueType.INCIDENT, IncidentIntent.RESOLVE}), Assertions.tuple(new Object[]{RecordType.EVENT, ValueType.INCIDENT, IncidentIntent.RESOLVE_FAILED})});
        Assertions.assertThat(list2).extracting(new Function[]{(v0) -> {
            return v0.recordType();
        }, (v0) -> {
            return v0.valueType();
        }, (v0) -> {
            return v0.intent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{RecordType.EVENT, ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.GATEWAY_ACTIVATED})});
        updatePayload(createWorkflowInstance, receiveFirstWorkflowInstanceEvent.key(), MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, 7).byteArray());
        List list3 = (List) this.testClient.receiveRecords().skipUntil(subscribedRecord3 -> {
            return subscribedRecord3.valueType() == ValueType.INCIDENT && subscribedRecord3.intent() == IncidentIntent.RESOLVE_FAILED;
        }).limit(subscribedRecord4 -> {
            return subscribedRecord4.valueType() == ValueType.INCIDENT && subscribedRecord4.intent() == IncidentIntent.RESOLVED;
        }).collect(Collectors.toList());
        List list4 = (List) this.testClient.receiveRecords().skipUntil(subscribedRecord5 -> {
            return subscribedRecord5.valueType() == ValueType.WORKFLOW_INSTANCE && subscribedRecord5.intent() == WorkflowInstanceIntent.GATEWAY_ACTIVATED;
        }).limit(subscribedRecord6 -> {
            return subscribedRecord6.valueType() == ValueType.WORKFLOW_INSTANCE && subscribedRecord6.intent() == WorkflowInstanceIntent.ELEMENT_COMPLETED && ((Long) subscribedRecord6.value().get("workflowInstanceKey")).longValue() == subscribedRecord6.key();
        }).collect(Collectors.toList());
        Assertions.assertThat(list3).extracting(new Function[]{(v0) -> {
            return v0.recordType();
        }, (v0) -> {
            return v0.valueType();
        }, (v0) -> {
            return v0.intent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{RecordType.COMMAND, ValueType.INCIDENT, IncidentIntent.RESOLVE}), Assertions.tuple(new Object[]{RecordType.EVENT, ValueType.INCIDENT, IncidentIntent.RESOLVED})});
        Assertions.assertThat(list4).extracting(new Function[]{(v0) -> {
            return v0.recordType();
        }, (v0) -> {
            return v0.valueType();
        }, (v0) -> {
            return v0.intent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{RecordType.EVENT, ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN}), Assertions.tuple(new Object[]{RecordType.EVENT, ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.END_EVENT_OCCURRED}), Assertions.tuple(new Object[]{RecordType.EVENT, ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldResolveIncidentForExclusiveGatewayWithoutMatchingCondition() throws Throwable {
        this.testClient.deploy(Bpmn.createExecutableProcess("workflow").startEvent().exclusiveGateway("xor").sequenceFlowId("s1").condition("$.foo < 5").endEvent().moveToLastGateway().sequenceFlowId("s2").condition("$.foo >= 5 && $.foo < 10").endEvent().done());
        long createWorkflowInstance = this.testClient.createWorkflowInstance("workflow", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, 12));
        this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        updatePayload(createWorkflowInstance, this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.GATEWAY_ACTIVATED).key(), MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, 7).byteArray());
        this.testClient.receiveFirstIncidentEvent(IncidentIntent.RESOLVED);
        this.testClient.receiveElementInState("workflow", WorkflowInstanceIntent.ELEMENT_COMPLETED);
    }

    @Test
    public void shouldFailToResolveIncident() throws Exception {
        this.testClient.deploy(Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().serviceTask("failingTask", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType("external").zeebeInput("$.foo", "$.foo").zeebeInput("$.bar", "$.bar");
        }).done());
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState("failingTask", WorkflowInstanceIntent.ELEMENT_READY);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        Assertions.assertThat(receiveFirstIncidentEvent.value()).containsEntry("errorMessage", "No data found for query $.foo.");
        updatePayload(createWorkflowInstance, receiveElementInState.key(), PAYLOAD);
        SubscribedRecord receiveFirstIncidentEvent2 = this.testClient.receiveFirstIncidentEvent(IncidentIntent.RESOLVE_FAILED);
        Assertions.assertThat(receiveFirstIncidentEvent2.key()).isEqualTo(receiveFirstIncidentEvent.key());
        Assertions.assertThat(receiveFirstIncidentEvent2.value()).containsEntry("errorType", ErrorType.IO_MAPPING_ERROR.name()).containsEntry("errorMessage", "No data found for query $.bar.").containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "failingTask");
    }

    @Test
    public void shouldResolveIncidentAfterPreviousResolvingFailed() throws Exception {
        this.testClient.deploy(WORKFLOW_INPUT_MAPPING);
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState("failingTask", WorkflowInstanceIntent.ELEMENT_READY);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        updatePayload(createWorkflowInstance, receiveElementInState.key(), MsgPackHelper.EMTPY_OBJECT);
        this.testClient.receiveFirstIncidentEvent(IncidentIntent.RESOLVE_FAILED);
        updatePayload(createWorkflowInstance, receiveElementInState.key(), PAYLOAD);
        SubscribedRecord receiveFirstIncidentEvent2 = this.testClient.receiveFirstIncidentEvent(IncidentIntent.RESOLVED);
        Assertions.assertThat(receiveFirstIncidentEvent2.key()).isEqualTo(receiveFirstIncidentEvent.key());
        Assertions.assertThat(receiveFirstIncidentEvent2.value()).containsEntry("errorType", ErrorType.IO_MAPPING_ERROR.name()).containsEntry("errorMessage", "No data found for query $.foo.").containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "failingTask");
    }

    @Test
    public void shouldResolveMultipleIncidents() throws Exception {
        this.testClient.deploy(WORKFLOW_INPUT_MAPPING);
        updatePayload(this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID), this.testClient.receiveElementInState("failingTask", WorkflowInstanceIntent.ELEMENT_READY).key(), PAYLOAD);
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        SubscribedRecord receiveFirstWorkflowInstanceEvent = this.testClient.receiveFirstWorkflowInstanceEvent(createWorkflowInstance, "failingTask", WorkflowInstanceIntent.ELEMENT_READY);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(createWorkflowInstance, IncidentIntent.CREATED);
        updatePayload(createWorkflowInstance, receiveFirstWorkflowInstanceEvent.key(), PAYLOAD);
        Assertions.assertThat(this.testClient.receiveFirstIncidentEvent(createWorkflowInstance, IncidentIntent.RESOLVED).key()).isEqualTo(receiveFirstIncidentEvent.key());
    }

    @Test
    public void shouldDeleteIncidentIfActivityTerminated() {
        this.testClient.deploy(WORKFLOW_INPUT_MAPPING);
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        cancelWorkflowInstance(createWorkflowInstance);
        SubscribedRecord receiveFirstWorkflowInstanceEvent = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_TERMINATED);
        SubscribedRecord receiveFirstIncidentCommand = this.testClient.receiveFirstIncidentCommand(IncidentIntent.DELETE);
        SubscribedRecord receiveFirstIncidentEvent2 = this.testClient.receiveFirstIncidentEvent(IncidentIntent.DELETED);
        Assertions.assertThat(receiveFirstIncidentEvent2.key()).isEqualTo(receiveFirstIncidentEvent.key());
        Assertions.assertThat(receiveFirstIncidentCommand.sourceRecordPosition()).isEqualTo(receiveFirstWorkflowInstanceEvent.position());
        Assertions.assertThat(receiveFirstIncidentEvent2.sourceRecordPosition()).isEqualTo(receiveFirstIncidentCommand.position());
        Assertions.assertThat(receiveFirstIncidentEvent2.value()).containsEntry("errorType", ErrorType.IO_MAPPING_ERROR.name()).containsEntry("errorMessage", "No data found for query $.foo.").containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "failingTask").containsEntry("activityInstanceKey", receiveFirstIncidentEvent2.value().get("activityInstanceKey"));
    }

    @Test
    @Category({UnstableTest.class})
    public void shouldProcessIncidentsAfterMultipleTerminations() {
        this.testClient.deploy(WORKFLOW_INPUT_MAPPING);
        cancelWorkflowInstance(this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID));
        cancelWorkflowInstance(this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, PAYLOAD));
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(createWorkflowInstance, IncidentIntent.CREATED);
        cancelWorkflowInstance(createWorkflowInstance);
        SubscribedRecord receiveFirstIncidentEvent2 = this.testClient.receiveFirstIncidentEvent(createWorkflowInstance, IncidentIntent.DELETED);
        Assertions.assertThat(receiveFirstIncidentEvent2.key()).isEqualTo(receiveFirstIncidentEvent.key());
        Assertions.assertThat(receiveFirstIncidentEvent2.value()).containsEntry("errorType", ErrorType.IO_MAPPING_ERROR.name()).containsEntry("errorMessage", "No data found for query $.foo.").containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "failingTask").containsEntry("activityInstanceKey", receiveFirstIncidentEvent2.value().get("activityInstanceKey"));
    }

    @Test
    public void shouldCreateIncidentIfJobHasNoRetriesLeft() {
        this.testClient.deploy(WORKFLOW_INPUT_MAPPING);
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, PAYLOAD);
        failJobWithNoRetriesLeft();
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState("failingTask", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        SubscribedRecord receiveFirstJobEvent = this.testClient.receiveFirstJobEvent(JobIntent.FAILED);
        SubscribedRecord receiveFirstIncidentCommand = this.testClient.receiveFirstIncidentCommand(IncidentIntent.CREATE);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        Assertions.assertThat(receiveFirstIncidentCommand.sourceRecordPosition()).isEqualTo(receiveFirstJobEvent.position());
        Assertions.assertThat(receiveFirstIncidentEvent.key()).isGreaterThan(0L);
        Assertions.assertThat(receiveFirstIncidentEvent.value()).containsEntry("errorType", ErrorType.JOB_NO_RETRIES.name()).containsEntry("errorMessage", "No more retries left.").containsEntry("failureEventPosition", Long.valueOf(receiveFirstJobEvent.position())).containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "failingTask").containsEntry("activityInstanceKey", Long.valueOf(receiveElementInState.key())).containsEntry("jobKey", Long.valueOf(receiveFirstJobEvent.key()));
    }

    @Test
    public void shouldDeleteIncidentIfJobRetriesIncreased() {
        this.testClient.deploy(WORKFLOW_INPUT_MAPPING);
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, PAYLOAD);
        failJobWithNoRetriesLeft();
        updateJobRetries();
        SubscribedRecord receiveFirstJobEvent = this.testClient.receiveFirstJobEvent(JobIntent.FAILED);
        SubscribedRecord receiveFirstJobEvent2 = this.testClient.receiveFirstJobEvent(JobIntent.RETRIES_UPDATED);
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState("failingTask", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        SubscribedRecord receiveFirstIncidentCommand = this.testClient.receiveFirstIncidentCommand(IncidentIntent.DELETE);
        Assertions.assertThat(receiveFirstIncidentCommand.key()).isGreaterThan(0L);
        Assertions.assertThat(receiveFirstIncidentCommand.sourceRecordPosition()).isEqualTo(receiveFirstJobEvent2.position());
        Assertions.assertThat(receiveFirstIncidentCommand.value()).containsEntry("errorType", ErrorType.JOB_NO_RETRIES.name()).containsEntry("errorMessage", "No more retries left.").containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "failingTask").containsEntry("activityInstanceKey", Long.valueOf(receiveElementInState.key())).containsEntry("jobKey", Long.valueOf(receiveFirstJobEvent.key()));
        long position = receiveFirstIncidentCommand.position();
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.DELETED);
        Assertions.assertThat(receiveFirstIncidentEvent.key()).isGreaterThan(0L);
        Assertions.assertThat(receiveFirstIncidentEvent.sourceRecordPosition()).isEqualTo(position);
        Assertions.assertThat(receiveFirstIncidentEvent.value()).containsEntry("errorType", ErrorType.JOB_NO_RETRIES.name()).containsEntry("errorMessage", "No more retries left.").containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "failingTask").containsEntry("activityInstanceKey", Long.valueOf(receiveElementInState.key())).containsEntry("jobKey", Long.valueOf(receiveFirstJobEvent.key()));
    }

    @Test
    public void shouldDeleteIncidentIfJobIsCanceled() {
        this.testClient.deploy(WORKFLOW_INPUT_MAPPING);
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, PAYLOAD);
        failJobWithNoRetriesLeft();
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        cancelWorkflowInstance(createWorkflowInstance);
        SubscribedRecord receiveFirstJobEvent = this.testClient.receiveFirstJobEvent(JobIntent.CANCELED);
        SubscribedRecord receiveFirstIncidentCommand = this.testClient.receiveFirstIncidentCommand(IncidentIntent.DELETE);
        Assertions.assertThat(receiveFirstIncidentCommand.key()).isEqualTo(receiveFirstIncidentEvent.key());
        Assertions.assertThat(receiveFirstIncidentCommand.sourceRecordPosition()).isEqualTo(receiveFirstJobEvent.position());
        Assertions.assertThat(receiveFirstIncidentCommand.value()).containsEntry("errorType", ErrorType.JOB_NO_RETRIES.name()).containsEntry("errorMessage", "No more retries left.").containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "failingTask").containsEntry("activityInstanceKey", receiveFirstIncidentCommand.value().get("activityInstanceKey")).containsEntry("jobKey", Long.valueOf(receiveFirstJobEvent.key()));
        SubscribedRecord receiveFirstIncidentEvent2 = this.testClient.receiveFirstIncidentEvent(IncidentIntent.DELETED);
        Assertions.assertThat(receiveFirstIncidentEvent2.key()).isEqualTo(receiveFirstIncidentEvent.key());
        Assertions.assertThat(receiveFirstIncidentEvent2.value()).containsEntry("errorType", ErrorType.JOB_NO_RETRIES.name()).containsEntry("errorMessage", "No more retries left.").containsEntry("bpmnProcessId", ParallelGatewayStreamProcessorTest.PROCESS_ID).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "failingTask").containsEntry("activityInstanceKey", receiveFirstIncidentEvent2.value().get("activityInstanceKey")).containsEntry("jobKey", Long.valueOf(receiveFirstJobEvent.key()));
    }

    @Test
    public void shouldCreateIncidentIfStandaloneJobHasNoRetriesLeft() {
        createStandaloneJob();
        failJobWithNoRetriesLeft();
        SubscribedRecord receiveFirstJobEvent = this.testClient.receiveFirstJobEvent(JobIntent.FAILED);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
        Assertions.assertThat(receiveFirstIncidentEvent.key()).isGreaterThan(0L);
        Assertions.assertThat(receiveFirstIncidentEvent.value()).containsEntry("errorType", ErrorType.JOB_NO_RETRIES.name()).containsEntry("errorMessage", "No more retries left.").containsEntry("failureEventPosition", Long.valueOf(receiveFirstJobEvent.position())).containsEntry("bpmnProcessId", "").containsEntry("workflowInstanceKey", -1L).containsEntry("activityId", "").containsEntry("activityInstanceKey", -1L).containsEntry("jobKey", Long.valueOf(receiveFirstJobEvent.key()));
    }

    @Test
    public void shouldDeleteStandaloneIncidentIfJobRetriesIncreased() {
        createStandaloneJob();
        failJobWithNoRetriesLeft();
        updateJobRetries();
        SubscribedRecord receiveFirstJobEvent = this.testClient.receiveFirstJobEvent(JobIntent.FAILED);
        SubscribedRecord receiveFirstIncidentEvent = this.testClient.receiveFirstIncidentEvent(IncidentIntent.DELETED);
        Assertions.assertThat(receiveFirstIncidentEvent.key()).isGreaterThan(0L);
        Assertions.assertThat(receiveFirstIncidentEvent.value()).containsEntry("errorType", ErrorType.JOB_NO_RETRIES.name()).containsEntry("errorMessage", "No more retries left.").containsEntry("bpmnProcessId", "").containsEntry("workflowInstanceKey", -1L).containsEntry("activityId", "").containsEntry("activityInstanceKey", -1L).containsEntry("jobKey", Long.valueOf(receiveFirstJobEvent.key()));
    }

    private void failJobWithNoRetriesLeft() {
        this.apiRule.openJobSubscription(ConfigurationTest.BROKER_BASE).await();
        SubscribedRecord receiveFirstJobEvent = this.testClient.receiveFirstJobEvent(JobIntent.ACTIVATED);
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().key(receiveFirstJobEvent.key()).type(ValueType.JOB, JobIntent.FAIL).command().put("retries", 0).put("type", ConfigurationTest.BROKER_BASE).put("worker", receiveFirstJobEvent.value().get("worker")).put("headers", receiveFirstJobEvent.value().get("headers")).done()).sendAndAwait();
        Assertions.assertThat(sendAndAwait.recordType()).isEqualTo(RecordType.EVENT);
        Assertions.assertThat(sendAndAwait.intent()).isEqualTo(JobIntent.FAILED);
    }

    private void createStandaloneJob() {
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB, JobIntent.CREATE).command().put("type", ConfigurationTest.BROKER_BASE).put("retries", 3).done()).sendAndAwait();
        Assertions.assertThat(sendAndAwait.recordType()).isEqualTo(RecordType.EVENT);
        Assertions.assertThat(sendAndAwait.intent()).isEqualTo(JobIntent.CREATED);
    }

    private void updateJobRetries() {
        SubscribedRecord receiveFirstJobEvent = this.testClient.receiveFirstJobEvent(JobIntent.FAILED);
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().key(receiveFirstJobEvent.key()).type(ValueType.JOB, JobIntent.UPDATE_RETRIES).command().put("retries", 1).put("type", ConfigurationTest.BROKER_BASE).put("worker", receiveFirstJobEvent.value().get("worker")).put("headers", receiveFirstJobEvent.value().get("headers")).done()).sendAndAwait();
        Assertions.assertThat(sendAndAwait.recordType()).isEqualTo(RecordType.EVENT);
        Assertions.assertThat(sendAndAwait.intent()).isEqualTo(JobIntent.RETRIES_UPDATED);
    }

    private void updatePayload(long j, long j2, byte[] bArr) {
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.UPDATE_PAYLOAD).key(j2).command().put(PROP_PAYLOAD, bArr).done()).sendAndAwait();
        Assertions.assertThat(sendAndAwait.recordType()).isEqualTo(RecordType.EVENT);
        Assertions.assertThat(sendAndAwait.intent()).isEqualTo(WorkflowInstanceIntent.PAYLOAD_UPDATED);
    }

    private void updatePayload(long j, SubscribedRecord subscribedRecord, String str) throws IOException {
        updatePayload(j, subscribedRecord.key(), MsgPackUtil.asMsgPack(str));
    }

    private void cancelWorkflowInstance(long j) {
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().key(j).type(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.CANCEL).command().done()).sendAndAwait();
        Assertions.assertThat(sendAndAwait.recordType()).isEqualTo(RecordType.EVENT);
        Assertions.assertThat(sendAndAwait.intent()).isEqualTo(WorkflowInstanceIntent.CANCELING);
    }

    static {
        DirectBuffer encodeMsgPack = MsgPackUtil.encodeMsgPack(messageBufferPacker -> {
            messageBufferPacker.packMapHeader(1);
            messageBufferPacker.packString(TypedStreamProcessorTest.STREAM_NAME);
            messageBufferPacker.packString(TestJarExporter.FOO);
        });
        PAYLOAD = new byte[encodeMsgPack.capacity()];
        encodeMsgPack.getBytes(0, PAYLOAD);
    }
}
