package io.camunda.zeebe.engine.processing.distribution;

import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.engine.state.immutable.DistributionState;
import io.camunda.zeebe.engine.state.routing.RoutingInfo;
import io.camunda.zeebe.engine.util.MockTypedRecord;
import io.camunda.zeebe.engine.util.stream.FakeProcessingResultBuilder;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.impl.record.value.distribution.CommandDistributionRecord;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.CommandDistributionIntent;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.stream.api.InterPartitionCommandSender;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import org.assertj.core.api.Assertions;
import org.assertj.core.groups.Tuple;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/distribution/CommandDistributionBehaviorTest.class */
class CommandDistributionBehaviorTest {
    private DistributionState mockDistributionState;
    private FakeProcessingResultBuilder<CommandDistributionRecord> fakeProcessingResultBuilder;
    private InterPartitionCommandSender mockInterpartitionCommandSender;
    private Writers writers;
    private long key;
    private ValueType valueType;
    private DeploymentIntent intent;
    private MockTypedRecord<DeploymentRecord> command;

    CommandDistributionBehaviorTest() {
    }

    @BeforeEach
    void setUp() {
        this.mockDistributionState = (DistributionState) Mockito.mock(DistributionState.class);
        this.fakeProcessingResultBuilder = new FakeProcessingResultBuilder<>();
        this.mockInterpartitionCommandSender = (InterPartitionCommandSender) Mockito.mock(InterPartitionCommandSender.class);
        this.writers = new Writers(() -> {
            return this.fakeProcessingResultBuilder;
        }, (EventApplier) Mockito.mock(EventAppliers.class));
        this.key = Protocol.encodePartitionId(1, 100L);
        this.valueType = ValueType.DEPLOYMENT;
        this.intent = DeploymentIntent.CREATE;
        this.command = new MockTypedRecord<>(this.key, new RecordMetadata().valueType(this.valueType).intent(this.intent), new DeploymentRecord());
    }

    @Test
    void shouldNotDistributeCommandToThisPartition() {
        new CommandDistributionBehavior(this.mockDistributionState, this.writers, 1, RoutingInfo.forStaticPartitions(1), this.mockInterpartitionCommandSender).withKey(this.key).unordered().distribute(this.command);
        Assertions.assertThat(this.fakeProcessingResultBuilder.getFollowupRecords()).isEmpty();
        this.fakeProcessingResultBuilder.flushPostCommitTasks();
        Mockito.verifyNoInteractions(new Object[]{this.mockInterpartitionCommandSender});
    }

    @Test
    void shouldDistributeCommandToAllOtherPartitions() {
        new CommandDistributionBehavior(this.mockDistributionState, this.writers, 1, RoutingInfo.forStaticPartitions(3), this.mockInterpartitionCommandSender).withKey(this.key).unordered().distribute(this.command);
        Assertions.assertThat(this.fakeProcessingResultBuilder.getFollowupRecords()).extracting(new Function[]{(v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getIntent();
        }, record -> {
            return Integer.valueOf(record.getValue().getPartitionId());
        }, record2 -> {
            return record2.getValue().getIntent();
        }}).hasSize(3).startsWith(new Tuple[]{Assertions.tuple(new Object[]{Long.valueOf(this.key), CommandDistributionIntent.STARTED, 1, this.intent})}).contains(new Tuple[]{Assertions.tuple(new Object[]{Long.valueOf(this.key), CommandDistributionIntent.DISTRIBUTING, 2, this.intent}), Assertions.tuple(new Object[]{Long.valueOf(this.key), CommandDistributionIntent.DISTRIBUTING, 3, this.intent})});
        this.fakeProcessingResultBuilder.flushPostCommitTasks();
        ((InterPartitionCommandSender) Mockito.verify(this.mockInterpartitionCommandSender)).sendCommand(ArgumentMatchers.eq(2), (ValueType) ArgumentMatchers.eq(this.valueType), (Intent) ArgumentMatchers.eq(this.intent), Long.valueOf(ArgumentMatchers.eq(this.key)), (UnifiedRecordValue) ArgumentMatchers.any());
        ((InterPartitionCommandSender) Mockito.verify(this.mockInterpartitionCommandSender)).sendCommand(ArgumentMatchers.eq(3), (ValueType) ArgumentMatchers.eq(this.valueType), (Intent) ArgumentMatchers.eq(this.intent), Long.valueOf(ArgumentMatchers.eq(this.key)), (UnifiedRecordValue) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockInterpartitionCommandSender});
    }

    @Test
    void shouldDistributeCommandToSpecificPartitions() {
        new CommandDistributionBehavior(this.mockDistributionState, this.writers, 2, RoutingInfo.forStaticPartitions(4), this.mockInterpartitionCommandSender).withKey(this.key).unordered().forPartitions(Set.of(1, 3)).distribute(this.command);
        Assertions.assertThat(this.fakeProcessingResultBuilder.getFollowupRecords()).extracting(new Function[]{(v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getIntent();
        }, record -> {
            return Integer.valueOf(record.getValue().getPartitionId());
        }, record2 -> {
            return record2.getValue().getIntent();
        }}).hasSize(3).startsWith(new Tuple[]{Assertions.tuple(new Object[]{Long.valueOf(this.key), CommandDistributionIntent.STARTED, 2, this.intent})}).contains(new Tuple[]{Assertions.tuple(new Object[]{Long.valueOf(this.key), CommandDistributionIntent.DISTRIBUTING, 1, this.intent}), Assertions.tuple(new Object[]{Long.valueOf(this.key), CommandDistributionIntent.DISTRIBUTING, 3, this.intent})});
        this.fakeProcessingResultBuilder.flushPostCommitTasks();
        ((InterPartitionCommandSender) Mockito.verify(this.mockInterpartitionCommandSender)).sendCommand(ArgumentMatchers.eq(1), (ValueType) ArgumentMatchers.eq(this.valueType), (Intent) ArgumentMatchers.eq(this.intent), Long.valueOf(ArgumentMatchers.eq(this.key)), (UnifiedRecordValue) ArgumentMatchers.any());
        ((InterPartitionCommandSender) Mockito.verify(this.mockInterpartitionCommandSender)).sendCommand(ArgumentMatchers.eq(3), (ValueType) ArgumentMatchers.eq(this.valueType), (Intent) ArgumentMatchers.eq(this.intent), Long.valueOf(ArgumentMatchers.eq(this.key)), (UnifiedRecordValue) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockInterpartitionCommandSender});
    }

    @Test
    void shouldStartQueueImmediately() {
        new CommandDistributionBehavior(this.mockDistributionState, this.writers, 1, RoutingInfo.forStaticPartitions(3), this.mockInterpartitionCommandSender).withKey(this.key).inQueue("test-queue").distribute(this.command);
        Assertions.assertThat(this.fakeProcessingResultBuilder.getFollowupRecords()).extracting(new Function[]{(v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getIntent();
        }, record -> {
            return Integer.valueOf(record.getValue().getPartitionId());
        }}).containsExactly(new Tuple[]{Assertions.tuple(new Object[]{Long.valueOf(this.key), CommandDistributionIntent.STARTED, 1}), Assertions.tuple(new Object[]{Long.valueOf(this.key), CommandDistributionIntent.ENQUEUED, 2}), Assertions.tuple(new Object[]{Long.valueOf(this.key), CommandDistributionIntent.DISTRIBUTING, 2}), Assertions.tuple(new Object[]{Long.valueOf(this.key), CommandDistributionIntent.ENQUEUED, 3}), Assertions.tuple(new Object[]{Long.valueOf(this.key), CommandDistributionIntent.DISTRIBUTING, 3})});
        this.fakeProcessingResultBuilder.flushPostCommitTasks();
        ((InterPartitionCommandSender) Mockito.verify(this.mockInterpartitionCommandSender)).sendCommand(ArgumentMatchers.eq(2), (ValueType) ArgumentMatchers.eq(this.valueType), (Intent) ArgumentMatchers.eq(this.intent), Long.valueOf(ArgumentMatchers.eq(this.key)), (UnifiedRecordValue) ArgumentMatchers.any());
        ((InterPartitionCommandSender) Mockito.verify(this.mockInterpartitionCommandSender)).sendCommand(ArgumentMatchers.eq(3), (ValueType) ArgumentMatchers.eq(this.valueType), (Intent) ArgumentMatchers.eq(this.intent), Long.valueOf(ArgumentMatchers.eq(this.key)), (UnifiedRecordValue) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockInterpartitionCommandSender});
    }

    @Test
    void shouldWaitInQueue() {
        CommandDistributionBehavior commandDistributionBehavior = new CommandDistributionBehavior(this.mockDistributionState, this.writers, 1, RoutingInfo.forStaticPartitions(3), this.mockInterpartitionCommandSender);
        long encodePartitionId = Protocol.encodePartitionId(1, 100L);
        long encodePartitionId2 = Protocol.encodePartitionId(1, 101L);
        commandDistributionBehavior.withKey(encodePartitionId).inQueue("test-queue").distribute(this.command);
        Mockito.when(this.mockDistributionState.getNextQueuedDistributionKey("test-queue", 2)).thenReturn(Optional.of(Long.valueOf(encodePartitionId)));
        Mockito.when(this.mockDistributionState.getNextQueuedDistributionKey("test-queue", 3)).thenReturn(Optional.of(Long.valueOf(encodePartitionId)));
        commandDistributionBehavior.withKey(encodePartitionId2).inQueue("test-queue").distribute(this.command);
        Assertions.assertThat(this.fakeProcessingResultBuilder.getFollowupRecords()).extracting(new Function[]{(v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getIntent();
        }, record -> {
            return Integer.valueOf(record.getValue().getPartitionId());
        }}).containsExactly(new Tuple[]{Assertions.tuple(new Object[]{Long.valueOf(encodePartitionId), CommandDistributionIntent.STARTED, 1}), Assertions.tuple(new Object[]{Long.valueOf(encodePartitionId), CommandDistributionIntent.ENQUEUED, 2}), Assertions.tuple(new Object[]{Long.valueOf(encodePartitionId), CommandDistributionIntent.DISTRIBUTING, 2}), Assertions.tuple(new Object[]{Long.valueOf(encodePartitionId), CommandDistributionIntent.ENQUEUED, 3}), Assertions.tuple(new Object[]{Long.valueOf(encodePartitionId), CommandDistributionIntent.DISTRIBUTING, 3}), Assertions.tuple(new Object[]{Long.valueOf(encodePartitionId2), CommandDistributionIntent.STARTED, 1}), Assertions.tuple(new Object[]{Long.valueOf(encodePartitionId2), CommandDistributionIntent.ENQUEUED, 2}), Assertions.tuple(new Object[]{Long.valueOf(encodePartitionId2), CommandDistributionIntent.ENQUEUED, 3})});
        this.fakeProcessingResultBuilder.flushPostCommitTasks();
        ((InterPartitionCommandSender) Mockito.verify(this.mockInterpartitionCommandSender)).sendCommand(ArgumentMatchers.eq(2), (ValueType) ArgumentMatchers.eq(this.valueType), (Intent) ArgumentMatchers.eq(this.intent), Long.valueOf(ArgumentMatchers.eq(encodePartitionId)), (UnifiedRecordValue) ArgumentMatchers.any());
        ((InterPartitionCommandSender) Mockito.verify(this.mockInterpartitionCommandSender)).sendCommand(ArgumentMatchers.eq(3), (ValueType) ArgumentMatchers.eq(this.valueType), (Intent) ArgumentMatchers.eq(this.intent), Long.valueOf(ArgumentMatchers.eq(encodePartitionId)), (UnifiedRecordValue) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockInterpartitionCommandSender});
    }
}
