package io.camunda.zeebe.broker.system.partitions;

import io.atomix.primitive.partition.PartitionId;
import io.atomix.raft.storage.log.RaftLogReader;
import io.camunda.zeebe.broker.system.management.BrokerAdminService;
import io.camunda.zeebe.broker.system.management.PartitionStatus;
import io.camunda.zeebe.broker.test.EmbeddedBrokerRule;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.PublishMessageResponse;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.snapshots.SnapshotId;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotMetadata;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.netty.util.NetUtil;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.agrona.CloseHelper;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:io/camunda/zeebe/broker/system/partitions/BrokerSnapshotTest.class */
public class BrokerSnapshotTest {
    private static final int PARTITION_ID = 1;

    @Rule
    public final EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(new Consumer[0]);
    private RaftLogReader journalReader;
    private BrokerAdminService brokerAdminService;
    private ZeebeClient client;

    @Before
    public void setup() {
        this.journalReader = this.brokerRule.getBroker().getBrokerContext().getPartitionManager().getPartitionGroup().getPartition(PartitionId.from("raft-partition", PARTITION_ID)).getServer().openReader();
        this.brokerAdminService = this.brokerRule.getBroker().getBrokerContext().getBrokerAdminService();
        this.client = ZeebeClient.newClientBuilder().usePlaintext().gatewayAddress(NetUtil.toSocketAddressString(this.brokerRule.getGatewayAddress())).build();
    }

    @After
    public void after() {
        CloseHelper.closeAll(new AutoCloseable[]{this.client, this.journalReader});
    }

    @Test
    public void shouldTakeSnapshotAtCorrectIndex() {
        createSomeEvents();
        this.brokerAdminService.takeSnapshot();
        SnapshotId waitForSnapshotAtBroker = waitForSnapshotAtBroker(this.brokerAdminService, PARTITION_ID);
        Assertions.assertThat(waitForSnapshotAtBroker.getIndex()).isEqualTo(this.journalReader.seekToAsqn(waitForSnapshotAtBroker.getProcessedPosition()) - 1);
    }

    private void createSomeEvents() {
        long asLong = IntStream.range(0, 10).mapToLong(this::publishMaxMessageSizeMessage).max().getAsLong();
        Awaitility.await().untilAsserted(() -> {
            ((AbstractBooleanAssert) Assertions.assertThat(RecordingExporter.messageRecords(MessageIntent.PUBLISHED).withRecordKey(asLong).exists()).describedAs("All records are exported", new Object[0])).isTrue();
        });
    }

    private long publishMaxMessageSizeMessage(int i) {
        return ((PublishMessageResponse) this.client.newPublishMessageCommand().messageName("msg").correlationKey("msg-" + i).send().join()).getMessageKey();
    }

    private SnapshotId waitForSnapshotAtBroker(BrokerAdminService brokerAdminService, int i) {
        Awaitility.await().pollInterval(1L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(((PartitionStatus) brokerAdminService.getPartitionStatus().get(Integer.valueOf(i))).getProcessedPositionInSnapshot()).isNotNull();
        });
        return (SnapshotId) FileBasedSnapshotMetadata.ofFileName(((PartitionStatus) this.brokerAdminService.getPartitionStatus().get(Integer.valueOf(PARTITION_ID))).getSnapshotId()).get();
    }
}
