/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.IntermediateCatchEventBuilder;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.impl.SubscriptionUtil;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent;
import io.camunda.zeebe.test.util.record.MessageSubscriptionRecordStream;
import io.camunda.zeebe.test.util.record.ProcessMessageSubscriptionRecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.UUID;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;

public final class MessageCorrelationReliabilityTest {
    @Rule
    public final EngineRule engine = EngineRule.multiplePartition(2);

    @Test
    public void shouldCleanupSubscriptionAfterRejection() {
        boolean processInstancePartitionId = true;
        int messageSubscriptionPartitionId = 2;
        String correlationKey = "test-2";
        Assertions.assertThat((int)SubscriptionUtil.getSubscriptionPartitionId((DirectBuffer)new UnsafeBuffer("test-2".getBytes(StandardCharsets.UTF_8)), (int)this.engine.getPartitionIds().size())).isEqualTo(2);
        String messageName = "message-" + String.valueOf(UUID.randomUUID());
        BpmnModelInstance process = ((IntermediateCatchEventBuilder)Bpmn.createExecutableProcess((String)"process").startEvent().intermediateCatchEvent("receive-message").message(m -> m.name(messageName).zeebeCorrelationKeyExpression("key"))).endEvent("end").done();
        this.engine.deployment().withXmlResource(process).deploy();
        long processInstanceKey = this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "test-2").create();
        Assertions.assertThat((int)Protocol.decodePartitionId((long)processInstanceKey)).isEqualTo(1);
        this.engine.deployment().withXmlResource(process).deploy();
        this.engine.interceptInterPartitionCommands((receiverPartitionId, valueType, intent, recordKey, command) -> receiverPartitionId != 2 || intent != MessageSubscriptionIntent.CORRELATE);
        this.engine.message().withName(messageName).withCorrelationKey("test-2").publish();
        this.engine.increaseTime(Duration.ofMinutes(10L));
        RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.REJECT).withProcessInstanceKey(processInstanceKey).await();
        long lastPosition = this.getLastPosition();
        Assertions.assertThat((long)((MessageSubscriptionRecordStream)((MessageSubscriptionRecordStream)RecordingExporter.records().between(0L, lastPosition).messageSubscriptionRecords().withIntent((Intent)MessageSubscriptionIntent.CORRELATE)).withProcessInstanceKey(processInstanceKey).withMessageName(messageName).withPartitionId(2)).count()).isZero();
        Assertions.assertThat((long)((ProcessMessageSubscriptionRecordStream)((ProcessMessageSubscriptionRecordStream)RecordingExporter.records().between(0L, lastPosition).processMessageSubscriptionRecords().withIntent((Intent)ProcessMessageSubscriptionIntent.CORRELATED)).withPartitionId(1)).withProcessInstanceKey(processInstanceKey).withMessageName(messageName).count()).isOne();
        Assertions.assertThat((long)((MessageSubscriptionRecordStream)((MessageSubscriptionRecordStream)RecordingExporter.records().between(0L, lastPosition).messageSubscriptionRecords().withIntent((Intent)MessageSubscriptionIntent.CORRELATE)).withProcessInstanceKey(processInstanceKey).withMessageName(messageName).withPartitionId(2)).count()).isZero();
        this.engine.increaseTime(Duration.ofMinutes(10L));
        Assertions.assertThat((long)((ProcessMessageSubscriptionRecordStream)((ProcessMessageSubscriptionRecordStream)((ProcessMessageSubscriptionRecordStream)RecordingExporter.records().between(0L, lastPosition).processMessageSubscriptionRecords().withIntent((Intent)ProcessMessageSubscriptionIntent.CORRELATE)).withRecordType(RecordType.COMMAND)).withProcessInstanceKey(processInstanceKey).withPartitionId(1)).withMessageName(messageName).count()).isEqualTo(2L);
        Assertions.assertThat((long)((MessageSubscriptionRecordStream)((MessageSubscriptionRecordStream)RecordingExporter.records().between(0L, lastPosition).messageSubscriptionRecords().withIntent((Intent)MessageSubscriptionIntent.REJECT)).withProcessInstanceKey(processInstanceKey).withPartitionId(2)).withMessageName(messageName).count()).isOne();
    }

    private long getLastPosition() {
        return this.engine.decision().ofDecisionId("noop").expectRejection().evaluate().getPosition();
    }
}

