package io.zeebe.broker.workflow;

import io.zeebe.broker.test.EmbeddedBrokerRule;
import io.zeebe.broker.test.MsgPackConstants;
import io.zeebe.broker.workflow.gateway.ParallelGatewayStreamProcessorTest;
import io.zeebe.exporter.record.Record;
import io.zeebe.exporter.record.value.WorkflowInstanceRecordValue;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.msgpack.spec.MsgPackHelper;
import io.zeebe.protocol.BpmnElementType;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.test.broker.protocol.clientapi.ClientApiRule;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandRequestBuilder;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandResponse;
import io.zeebe.test.broker.protocol.clientapi.PartitionTestClient;
import io.zeebe.test.util.MsgPackUtil;
import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/zeebe/broker/workflow/UpdatePayloadTest.class */
public class UpdatePayloadTest {
    private static final BpmnModelInstance WORKFLOW = Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().serviceTask("task-1", serviceTaskBuilder -> {
        serviceTaskBuilder.zeebeTaskType("task-1").zeebeTaskRetries(5).zeebeOutput(MsgPackConstants.NODE_JSON_OBJECT_PATH, "$.obj");
    }).serviceTask("task-2", serviceTaskBuilder2 -> {
        serviceTaskBuilder2.zeebeTaskType("task-2").zeebeTaskRetries(5);
    }).endEvent().done();
    public EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(new Consumer[0]);
    public ClientApiRule apiRule;

    @Rule
    public RuleChain ruleChain;
    private PartitionTestClient testClient;

    public UpdatePayloadTest() {
        EmbeddedBrokerRule embeddedBrokerRule = this.brokerRule;
        embeddedBrokerRule.getClass();
        this.apiRule = new ClientApiRule(embeddedBrokerRule::getClientAddress);
        this.ruleChain = RuleChain.outerRule(this.brokerRule).around(this.apiRule);
    }

    @Before
    public void init() {
        this.testClient = this.apiRule.partitionClient();
    }

    @Test
    public void shouldUpdatePayload() {
        this.testClient.deploy(WORKFLOW);
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        Record<WorkflowInstanceRecordValue> waitForActivityActivatedEvent = waitForActivityActivatedEvent();
        Assertions.assertThat(updatePayload(waitForActivityActivatedEvent.getKey(), MsgPackUtil.asMsgPackReturnArray("{'foo':'bar'}")).getIntent()).isEqualTo(WorkflowInstanceIntent.PAYLOAD_UPDATED);
        Record receiveFirstWorkflowInstanceCommand = this.testClient.receiveFirstWorkflowInstanceCommand(WorkflowInstanceIntent.UPDATE_PAYLOAD);
        Record receiveFirstWorkflowInstanceEvent = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.PAYLOAD_UPDATED);
        Assertions.assertThat(receiveFirstWorkflowInstanceEvent.getSourceRecordPosition()).isEqualTo(receiveFirstWorkflowInstanceCommand.getPosition());
        Assertions.assertThat(receiveFirstWorkflowInstanceEvent.getKey()).isEqualTo(waitForActivityActivatedEvent.getKey());
        Assertions.assertThat(receiveFirstWorkflowInstanceEvent.getValue().getWorkflowInstanceKey()).isEqualTo(createWorkflowInstance);
        WorkflowAssert.assertWorkflowInstancePayload((Record<WorkflowInstanceRecordValue>) receiveFirstWorkflowInstanceEvent, "{'foo':'bar'}");
    }

    @Test
    public void shouldUpdateWithNilPayload() {
        this.testClient.deploy(WORKFLOW);
        this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        Assertions.assertThat(updatePayload(waitForActivityActivatedEvent().getKey(), MsgPackHelper.NIL).getIntent()).isEqualTo(WorkflowInstanceIntent.PAYLOAD_UPDATED);
        WorkflowAssert.assertWorkflowInstancePayload((Record<WorkflowInstanceRecordValue>) this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.PAYLOAD_UPDATED), "{}");
    }

    @Test
    public void shouldUpdateWithZeroLengthPayload() {
        this.testClient.deploy(WORKFLOW);
        this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        Assertions.assertThat(updatePayload(waitForActivityActivatedEvent().getKey(), new byte[0]).getIntent()).isEqualTo(WorkflowInstanceIntent.PAYLOAD_UPDATED);
        WorkflowAssert.assertWorkflowInstancePayload((Record<WorkflowInstanceRecordValue>) this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.PAYLOAD_UPDATED), "{}");
    }

    @Test
    public void shouldUpdatePayloadWhenActivityActivated() {
        this.testClient.deploy(WORKFLOW);
        this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        updatePayload(waitForActivityActivatedEvent().getKey(), MsgPackUtil.asMsgPackReturnArray("{'b':'wf'}"));
        this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.PAYLOAD_UPDATED);
        this.testClient.completeJobOfType("task-1", MsgPackConstants.MSGPACK_PAYLOAD);
        WorkflowAssert.assertWorkflowInstancePayload(waitForActivityCompletedEvent(), "{'obj':{'testAttr':'test'}, 'b':'wf'}");
    }

    @Test
    public void shouldUpdatePayloadWhenCatchEventIsEntered() {
        this.testClient.deploy(Bpmn.createExecutableProcess("wf").startEvent().intermediateCatchEvent("catch-event").message(messageBuilder -> {
            messageBuilder.name("msg").zeebeCorrelationKey("$.id");
        }).done());
        this.testClient.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("id", "123"));
        updatePayload(this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATED, BpmnElementType.INTERMEDIATE_CATCH_EVENT).getKey(), MsgPackUtil.asMsgPackReturnArray("{'id':'123', 'x': 1}"));
        this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.PAYLOAD_UPDATED);
        this.testClient.publishMessage("msg", "123", MsgPackUtil.asMsgPack("y", 2));
        WorkflowAssert.assertWorkflowInstancePayload((Record<WorkflowInstanceRecordValue>) this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETED, BpmnElementType.INTERMEDIATE_CATCH_EVENT), "{'id':'123', 'x': 1, 'y': 2}");
    }

    @Test
    public void shouldThrowExceptionForInvalidPayload() {
        this.testClient.deploy(WORKFLOW);
        this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        Record<WorkflowInstanceRecordValue> waitForActivityActivatedEvent = waitForActivityActivatedEvent();
        Throwable catchThrowable = Assertions.catchThrowable(() -> {
            updatePayload(waitForActivityActivatedEvent.getKey(), MsgPackUtil.asMsgPackReturnArray("'foo'"));
        });
        Assertions.assertThat(catchThrowable).isInstanceOf(RuntimeException.class);
        Assertions.assertThat(catchThrowable.getMessage()).contains(new CharSequence[]{"Could not read property 'payload'"});
        Assertions.assertThat(catchThrowable.getMessage()).contains(new CharSequence[]{"Expected document to be a root level object, but was 'STRING'"});
    }

    @Test
    public void shouldRejectUpdateForNonExistingWorkflowInstance() {
        ExecuteCommandResponse updatePayload = updatePayload(-1L, MsgPackConstants.MSGPACK_PAYLOAD);
        Assertions.assertThat(updatePayload.getRecordType()).isEqualTo(RecordType.COMMAND_REJECTION);
        Assertions.assertThat(updatePayload.getRejectionType()).isEqualTo(RejectionType.NOT_FOUND);
        Assertions.assertThat((Record) this.testClient.receiveWorkflowInstances().onlyCommandRejections().withIntent(WorkflowInstanceIntent.UPDATE_PAYLOAD).getFirst()).isNotNull();
    }

    @Test
    public void shouldRejectUpdateForCompletedWorkflowInstance() {
        this.testClient.deploy(WORKFLOW);
        this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID);
        Record<WorkflowInstanceRecordValue> waitForActivityActivatedEvent = waitForActivityActivatedEvent();
        this.testClient.completeJobOfType("task-1", MsgPackConstants.MSGPACK_PAYLOAD);
        waitForActivityCompletedEvent();
        this.testClient.completeJobOfType("task-2");
        this.testClient.receiveElementInState(ParallelGatewayStreamProcessorTest.PROCESS_ID, WorkflowInstanceIntent.ELEMENT_COMPLETED);
        ExecuteCommandResponse updatePayload = updatePayload(waitForActivityActivatedEvent.getKey(), MsgPackConstants.MSGPACK_PAYLOAD);
        Assertions.assertThat(updatePayload.getRecordType()).isEqualTo(RecordType.COMMAND_REJECTION);
        Assertions.assertThat(updatePayload.getRejectionType()).isEqualTo(RejectionType.NOT_FOUND);
        Assertions.assertThat((Record) this.testClient.receiveWorkflowInstances().onlyCommandRejections().withIntent(WorkflowInstanceIntent.UPDATE_PAYLOAD).getFirst()).isNotNull();
    }

    private Record<WorkflowInstanceRecordValue> waitForActivityCompletedEvent() {
        return this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETED, BpmnElementType.SERVICE_TASK);
    }

    private Record<WorkflowInstanceRecordValue> waitForActivityActivatedEvent() {
        return this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATED, BpmnElementType.SERVICE_TASK);
    }

    private ExecuteCommandResponse updatePayload(long j, byte[] bArr) {
        return ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.UPDATE_PAYLOAD).key(j).command().put("payload", bArr).done()).sendAndAwait();
    }
}
