package io.zeebe.broker.clustering.management;

import io.zeebe.broker.clustering.api.ErrorResponse;
import io.zeebe.broker.clustering.api.FetchSnapshotChunkRequest;
import io.zeebe.broker.clustering.api.FetchSnapshotChunkResponse;
import io.zeebe.broker.clustering.api.ListSnapshotsRequest;
import io.zeebe.broker.clustering.api.ListSnapshotsResponse;
import io.zeebe.broker.clustering.api.ManagementApiRequestHandler;
import io.zeebe.broker.clustering.base.partitions.Partition;
import io.zeebe.broker.clustering.base.raft.RaftPersistentConfigurationManager;
import io.zeebe.broker.clustering.base.topology.PartitionInfo;
import io.zeebe.broker.exporter.util.TestJarExporter;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessorTest;
import io.zeebe.broker.system.configuration.BrokerCfg;
import io.zeebe.broker.transport.clientapi.BufferingServerOutput;
import io.zeebe.clustering.management.ErrorResponseCode;
import io.zeebe.logstreams.impl.snapshot.fs.FsSnapshotStorage;
import io.zeebe.logstreams.impl.snapshot.fs.FsSnapshotStorageConfiguration;
import io.zeebe.logstreams.snapshot.SerializableWrapper;
import io.zeebe.logstreams.spi.ReadableSnapshot;
import io.zeebe.logstreams.spi.SnapshotStorage;
import io.zeebe.logstreams.spi.SnapshotSupport;
import io.zeebe.logstreams.spi.SnapshotWriter;
import io.zeebe.raft.state.RaftState;
import io.zeebe.servicecontainer.testing.ServiceContainerRule;
import io.zeebe.transport.SocketAddress;
import io.zeebe.transport.impl.RemoteAddressImpl;
import io.zeebe.util.buffer.BufferReader;
import io.zeebe.util.buffer.BufferWriter;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.ActorControl;
import io.zeebe.util.sched.testing.ControlledActorSchedulerRule;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.agrona.ExpandableDirectByteBuffer;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/zeebe/broker/clustering/management/ManagementApiRequestHandlerTest.class */
public class ManagementApiRequestHandlerTest {
    private Map<Integer, Partition> trackedSnapshotPartitions;
    private final TestActor actor = new TestActor();
    private BufferingServerOutput output = new BufferingServerOutput();
    private ManagementApiRequestHandler handler = createHandler();
    private final TemporaryFolder tempFolder = new TemporaryFolder();
    private final ControlledActorSchedulerRule actorSchedulerRule = new ControlledActorSchedulerRule();
    private final ServiceContainerRule serviceContainerRule = new ServiceContainerRule(this.actorSchedulerRule);

    @Rule
    public RuleChain ruleChain = RuleChain.outerRule(this.tempFolder).around(this.actorSchedulerRule).around(this.serviceContainerRule);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/zeebe/broker/clustering/management/ManagementApiRequestHandlerTest$TestActor.class */
    public class TestActor extends Actor {
        ActorControl mocked;

        private TestActor() {
            this.mocked = (ActorControl) Mockito.spy(this.actor);
        }

        ActorControl getActorControl() {
            return this.mocked;
        }

        void run(Runnable runnable) {
            this.mocked.run(runnable);
        }
    }

    @Before
    public void setup() {
        this.trackedSnapshotPartitions = new ConcurrentHashMap();
        this.actorSchedulerRule.submitActor(this.actor);
        this.output = new BufferingServerOutput();
        this.handler = createHandler();
    }

    @Test
    public void shouldHandleListSnapshotsRequests() {
        Partition createAndTrackPartition = createAndTrackPartition(1);
        ReadableSnapshot[] readableSnapshotArr = {createSnapshot(createAndTrackPartition, "first", 1L, new SerializableWrapper(TypedStreamProcessorTest.STREAM_NAME)), createSnapshot(createAndTrackPartition, "second", 2L, new SerializableWrapper(TestJarExporter.FOO))};
        BufferReader listSnapshotsResponse = new ListSnapshotsResponse();
        sendRequest(new ListSnapshotsRequest().setPartitionId(1));
        Assertions.assertThat(this.output.getSentResponses().size()).isEqualTo(1);
        this.output.wrapResponse(0, listSnapshotsResponse);
        Assertions.assertThat(listSnapshotsResponse.getSnapshots().size()).isEqualTo(2);
        for (ReadableSnapshot readableSnapshot : readableSnapshotArr) {
            Optional findFirst = listSnapshotsResponse.getSnapshots().stream().filter(snapshotMetadata -> {
                return snapshotMetadata.getName().equals(readableSnapshot.getName());
            }).findFirst();
            Assertions.assertThat(findFirst.isPresent()).isTrue();
            if (findFirst.isPresent()) {
                ListSnapshotsResponse.SnapshotMetadata snapshotMetadata2 = (ListSnapshotsResponse.SnapshotMetadata) findFirst.get();
                Assertions.assertThat(snapshotMetadata2.getName()).isEqualTo(readableSnapshot.getName());
                Assertions.assertThat(snapshotMetadata2.getLength()).isEqualTo(readableSnapshot.getSize());
                Assertions.assertThat(snapshotMetadata2.getChecksum()).isEqualTo(readableSnapshot.getChecksum());
                Assertions.assertThat(snapshotMetadata2.getLogPosition()).isEqualTo(readableSnapshot.getPosition());
            }
        }
    }

    @Test
    public void shouldListSnapshotsAndReceiveErrorWhenRequestingNotTrackedPartition() {
        createThrowawayPartitionWithSnapshot(1);
        sendRequest(new ListSnapshotsRequest().setPartitionId(2));
        assertError(this.output, ErrorResponseCode.PARTITION_NOT_FOUND, String.format("not currently tracking given partition %d", 2));
    }

    @Test
    public void shouldFetchSnapshotChunkAndReceiveErrorWhenRequestingNotTrackedPartition() {
        createThrowawayPartitionWithSnapshot(3);
        sendRequest(new FetchSnapshotChunkRequest().setPartitionId(4));
        assertError(this.output, ErrorResponseCode.PARTITION_NOT_FOUND, "not currently tracking given partition %d");
    }

    @Test
    public void shouldFetchSnapshotChunkAndReceiveErrorWhenChunkOffsetIsNegative() {
        sendRequest(getFetchSnapshotChunkRequest(5, createAndTrackPartitionWithSnapshot(5)).setChunkOffset(-1L));
        assertError(this.output, ErrorResponseCode.INVALID_PARAMETERS, "chunkOffset must be >= 0");
    }

    @Test
    public void shouldFetchSnapshotChunkAndReceiveErrorWhenChunkLengthIsNotPositive() {
        sendRequest(getFetchSnapshotChunkRequest(2, createAndTrackPartitionWithSnapshot(2)).setChunkLength(0));
        assertError(this.output, ErrorResponseCode.INVALID_PARAMETERS, "chunkLength must be between 1 and 512kb");
    }

    @Test
    public void shouldFetchSnapshotChunkAndReceiveErrorOnSkipError() throws Exception {
        ReadableSnapshot createAndTrackPartitionWithMockSnapshot = createAndTrackPartitionWithMockSnapshot(3);
        FetchSnapshotChunkRequest chunkOffset = getFetchSnapshotChunkRequest(3, createAndTrackPartitionWithMockSnapshot).setChunkOffset(1L);
        InputStream inputStream = (InputStream) Mockito.spy(createAndTrackPartitionWithMockSnapshot.getData());
        ((ReadableSnapshot) Mockito.doAnswer(invocationOnMock -> {
            return inputStream;
        }).when(createAndTrackPartitionWithMockSnapshot)).getData();
        ((InputStream) Mockito.doAnswer(invocationOnMock2 -> {
            return -1L;
        }).when(inputStream)).skip(ArgumentMatchers.anyLong());
        sendRequest(chunkOffset);
        assertError(this.output, ErrorResponseCode.READ_ERROR, "could not seek to given chunkOffset");
    }

    @Test
    public void shouldFetchSnapshotChunkAndReceiveErrorOnNoBytesRead() throws Exception {
        ReadableSnapshot createAndTrackPartitionWithMockSnapshot = createAndTrackPartitionWithMockSnapshot(3);
        FetchSnapshotChunkRequest chunkOffset = getFetchSnapshotChunkRequest(3, createAndTrackPartitionWithMockSnapshot).setChunkOffset(1L);
        InputStream inputStream = (InputStream) Mockito.spy(createAndTrackPartitionWithMockSnapshot.getData());
        ((ReadableSnapshot) Mockito.doAnswer(invocationOnMock -> {
            return inputStream;
        }).when(createAndTrackPartitionWithMockSnapshot)).getData();
        ((InputStream) Mockito.doAnswer(invocationOnMock2 -> {
            return -1;
        }).when(inputStream)).read((byte[]) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt());
        sendRequest(chunkOffset);
        assertError(this.output, ErrorResponseCode.READ_ERROR, "could not read requested amount of bytes");
    }

    @Test
    public void shouldFetchSnapshotChunkAndReceiveErrorOnReadError() throws Exception {
        ReadableSnapshot createAndTrackPartitionWithMockSnapshot = createAndTrackPartitionWithMockSnapshot(3);
        FetchSnapshotChunkRequest chunkOffset = getFetchSnapshotChunkRequest(3, createAndTrackPartitionWithMockSnapshot).setChunkOffset(1L);
        InputStream inputStream = (InputStream) Mockito.spy(createAndTrackPartitionWithMockSnapshot.getData());
        ((ReadableSnapshot) Mockito.doAnswer(invocationOnMock -> {
            return inputStream;
        }).when(createAndTrackPartitionWithMockSnapshot)).getData();
        ((InputStream) Mockito.doThrow(new Throwable[]{new IOException()}).when(inputStream)).read((byte[]) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt());
        sendRequest(chunkOffset);
        assertError(this.output, ErrorResponseCode.READ_ERROR, "unexpected read error occurred");
    }

    @Test
    public void shouldFetchSnapshotChunkAndReceiveErrorOnOpenSnapshotError() throws Exception {
        SnapshotStorage snapshotStorage = (SnapshotStorage) Mockito.spy(createSnapshotStorage());
        FetchSnapshotChunkRequest fetchSnapshotChunkRequest = getFetchSnapshotChunkRequest(3, createSnapshot(createAndTrackPartition(3, snapshotStorage)));
        ((SnapshotStorage) Mockito.doThrow(new Throwable[]{new RuntimeException()}).when(snapshotStorage)).getLastSnapshot((String) ArgumentMatchers.any());
        sendRequest(fetchSnapshotChunkRequest);
        assertError(this.output, ErrorResponseCode.READ_ERROR, "could not open snapshot");
    }

    @Test
    public void shouldFetchSnapshotChunkAndReceiveErrorOnSnapshotNotFound() throws Exception {
        FetchSnapshotChunkRequest logPosition = new FetchSnapshotChunkRequest().setPartitionId(3).setChunkLength(32).setChunkOffset(1L).setName("something").setLogPosition(1L);
        createAndTrackPartition(3);
        sendRequest(logPosition);
        assertError(this.output, ErrorResponseCode.INVALID_PARAMETERS, "no snapshot found for given name and position");
    }

    @Test
    public void shouldFetchSnapshotChunkWithEnoughDataForChunk() throws IOException {
        ReadableSnapshot createAndTrackPartitionWithSnapshotContents = createAndTrackPartitionWithSnapshotContents(2, new SerializableWrapper(TypedStreamProcessorTest.STREAM_NAME));
        byte[] bArr = new byte[(int) createAndTrackPartitionWithSnapshotContents.getSize()];
        FetchSnapshotChunkRequest chunkLength = getFetchSnapshotChunkRequest(2, createAndTrackPartitionWithSnapshotContents).setChunkLength(1);
        BufferReader fetchSnapshotChunkResponse = new FetchSnapshotChunkResponse();
        createAndTrackPartitionWithSnapshotContents.getData().read(bArr);
        sendRequest(chunkLength);
        Assertions.assertThat(this.output.getSentResponses().size()).isEqualTo(1);
        this.output.wrapResponse(0, fetchSnapshotChunkResponse);
        Assertions.assertThat(fetchSnapshotChunkResponse.getData().capacity()).isEqualTo(1);
        Assertions.assertThat(fetchSnapshotChunkResponse.getData().getByte(0)).isEqualTo(bArr[0]);
    }

    @Test
    public void shouldFetchSnapshotChunkRecursively() throws Exception {
        SerializableWrapper serializableWrapper = new SerializableWrapper(TypedStreamProcessorTest.STREAM_NAME);
        ReadableSnapshot createAndTrackPartitionWithSnapshotContents = createAndTrackPartitionWithSnapshotContents(2, serializableWrapper);
        byte[] bArr = new byte[(int) createAndTrackPartitionWithSnapshotContents.getSize()];
        FetchSnapshotChunkRequest chunkLength = getFetchSnapshotChunkRequest(2, createAndTrackPartitionWithSnapshotContents).setChunkLength(1);
        BufferReader fetchSnapshotChunkResponse = new FetchSnapshotChunkResponse();
        byte[] bArr2 = new byte[(int) createAndTrackPartitionWithSnapshotContents.getSize()];
        createAndTrackPartitionWithSnapshotContents.getData().read(bArr);
        for (int i = 0; i < bArr.length; i++) {
            sendRequest(chunkLength.setChunkOffset(i));
            Assertions.assertThat(this.output.getSentResponses().size()).isEqualTo(i + 1);
            this.output.wrapResponse(i, fetchSnapshotChunkResponse);
            Assertions.assertThat(fetchSnapshotChunkResponse.getData().capacity()).isEqualTo(1);
            Assertions.assertThat(fetchSnapshotChunkResponse.getData().getByte(0)).isEqualTo(bArr[i]);
            bArr2[i] = fetchSnapshotChunkResponse.getData().getByte(0);
        }
        SerializableWrapper serializableWrapper2 = new SerializableWrapper("");
        serializableWrapper2.recoverFromSnapshot(new ByteArrayInputStream(bArr2));
        Assertions.assertThat((String) serializableWrapper2.getObject()).isEqualTo(serializableWrapper.getObject());
    }

    @Test
    public void shouldHandleConcurrentFetchSnapshotChunkRequests() throws Exception {
        BufferReader fetchSnapshotChunkResponse = new FetchSnapshotChunkResponse();
        ReadableSnapshot createAndTrackPartitionWithSnapshotContents = createAndTrackPartitionWithSnapshotContents(2, new SerializableWrapper(TypedStreamProcessorTest.STREAM_NAME));
        byte[] bArr = new byte[(int) createAndTrackPartitionWithSnapshotContents.getSize()];
        FetchSnapshotChunkRequest chunkLength = getFetchSnapshotChunkRequest(2, createAndTrackPartitionWithSnapshotContents).setChunkOffset(1L).setChunkLength(1);
        FetchSnapshotChunkRequest chunkLength2 = getFetchSnapshotChunkRequest(2, createAndTrackPartitionWithSnapshotContents).setChunkOffset(2L).setChunkLength(1);
        ActorControl actorControl = this.actor.getActorControl();
        createAndTrackPartitionWithSnapshotContents.getData().read(bArr, 0, (int) createAndTrackPartitionWithSnapshotContents.getSize());
        this.actor.run(() -> {
            ((ActorControl) Mockito.doAnswer(invocationOnMock -> {
                actorControl.submit((Runnable) invocationOnMock.getArgument(0));
                return null;
            }).when(actorControl)).runUntilDone((Runnable) ArgumentMatchers.any());
            ((ActorControl) Mockito.doNothing().when(actorControl)).done();
        });
        this.actorSchedulerRule.workUntilDone();
        sendRequestAsync(chunkLength);
        sendRequestAsync(chunkLength2);
        Assertions.assertThat(this.output.getSentResponses()).isEmpty();
        this.actorSchedulerRule.workUntilDone();
        Assertions.assertThat(this.output.getSentResponses().size()).isEqualTo(2);
        this.output.wrapResponse(0, fetchSnapshotChunkResponse);
        Assertions.assertThat(fetchSnapshotChunkResponse.getData().getByte(0)).isEqualTo(bArr[1]);
        this.output.wrapResponse(1, fetchSnapshotChunkResponse);
        Assertions.assertThat(fetchSnapshotChunkResponse.getData().getByte(0)).isEqualTo(bArr[2]);
    }

    private void sendRequestAsync(BufferWriter bufferWriter) {
        RemoteAddressImpl remoteAddressImpl = new RemoteAddressImpl(1, new SocketAddress("0.0.0.0", 8080));
        ExpandableDirectByteBuffer expandableDirectByteBuffer = new ExpandableDirectByteBuffer();
        bufferWriter.write(expandableDirectByteBuffer, 0);
        this.actor.run(() -> {
            this.handler.onRequest(this.output, remoteAddressImpl, expandableDirectByteBuffer, 0, bufferWriter.getLength(), 1L);
        });
    }

    private void sendRequest(BufferWriter bufferWriter) {
        sendRequestAsync(bufferWriter);
        this.actorSchedulerRule.workUntilDone();
    }

    private ManagementApiRequestHandler createHandler() {
        new RaftPersistentConfigurationManager(new BrokerCfg().getData());
        return new ManagementApiRequestHandler(this.actor.getActorControl(), this.trackedSnapshotPartitions);
    }

    private Partition createAndTrackPartition(int i, final SnapshotStorage snapshotStorage) {
        Partition partition = new Partition(new PartitionInfo(i, 1), RaftState.LEADER) { // from class: io.zeebe.broker.clustering.management.ManagementApiRequestHandlerTest.1
            public SnapshotStorage getSnapshotStorage() {
                return snapshotStorage;
            }
        };
        this.trackedSnapshotPartitions.put(Integer.valueOf(i), partition);
        return partition;
    }

    private Partition createAndTrackPartition(int i) {
        return createAndTrackPartition(i, createSnapshotStorage());
    }

    private ReadableSnapshot createSnapshot(Partition partition, String str, long j, SnapshotSupport snapshotSupport) {
        SnapshotStorage snapshotStorage = partition.getSnapshotStorage();
        try {
            SnapshotWriter createSnapshot = snapshotStorage.createSnapshot(str, j);
            createSnapshot.writeSnapshot(snapshotSupport);
            createSnapshot.commit();
            return snapshotStorage.getLastSnapshot(str);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private FsSnapshotStorage createSnapshotStorage() {
        String absolutePath = this.tempFolder.getRoot().getAbsolutePath();
        FsSnapshotStorageConfiguration fsSnapshotStorageConfiguration = new FsSnapshotStorageConfiguration();
        fsSnapshotStorageConfiguration.setRootPath(absolutePath);
        return new FsSnapshotStorage(fsSnapshotStorageConfiguration);
    }

    private void createThrowawayPartitionWithSnapshot(int i) {
        createAndTrackPartitionWithSnapshot(i);
    }

    private ReadableSnapshot createAndTrackPartitionWithSnapshot(int i) {
        return createSnapshot(createAndTrackPartition(i));
    }

    private ReadableSnapshot createAndTrackPartitionWithMockSnapshot(int i) throws Exception {
        SnapshotStorage snapshotStorage = (SnapshotStorage) Mockito.spy(createSnapshotStorage());
        ReadableSnapshot createSnapshot = createSnapshot(createAndTrackPartition(i, snapshotStorage));
        ReadableSnapshot readableSnapshot = (ReadableSnapshot) Mockito.spy(createSnapshot);
        ((SnapshotStorage) Mockito.doAnswer(invocationOnMock -> {
            return readableSnapshot;
        }).when(snapshotStorage)).getLastSnapshot(createSnapshot.getName());
        return readableSnapshot;
    }

    private ReadableSnapshot createSnapshot(Partition partition) {
        return createSnapshot(partition, "something", 10L, new SerializableWrapper("snapshot contents"));
    }

    private ReadableSnapshot createAndTrackPartitionWithSnapshotContents(int i, SnapshotSupport snapshotSupport) {
        return createSnapshot(createAndTrackPartition(i), "something", 30L, snapshotSupport);
    }

    private FetchSnapshotChunkRequest getFetchSnapshotChunkRequest(int i, ReadableSnapshot readableSnapshot) {
        return new FetchSnapshotChunkRequest().setPartitionId(i).setName(readableSnapshot.getName()).setLogPosition(readableSnapshot.getPosition()).setChunkOffset(0L).setChunkLength((int) readableSnapshot.getSize());
    }

    private void assertError(BufferingServerOutput bufferingServerOutput, ErrorResponseCode errorResponseCode, String str) {
        ErrorResponse errorResponse = new ErrorResponse();
        Assertions.assertThat(bufferingServerOutput.getSentResponses().size()).isEqualTo(1);
        bufferingServerOutput.wrapResponse(0, errorResponse);
        Assertions.assertThat(errorResponse.getCode()).isEqualTo(errorResponseCode);
        Assertions.assertThat(errorResponse.getMessage()).isEqualTo(str);
    }
}
