package io.zeebe.broker.transport.clientapi;

import io.zeebe.broker.clustering.base.partitions.Partition;
import io.zeebe.broker.clustering.base.topology.PartitionInfo;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessorTest;
import io.zeebe.broker.system.ConfigurationTest;
import io.zeebe.broker.transport.controlmessage.ControlMessageRequestHeaderDescriptor;
import io.zeebe.dispatcher.ClaimedFragment;
import io.zeebe.dispatcher.Dispatcher;
import io.zeebe.dispatcher.impl.log.DataFrameDescriptor;
import io.zeebe.logstreams.LogStreams;
import io.zeebe.logstreams.log.BufferedLogStreamReader;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.protocol.clientapi.ControlMessageRequestDecoder;
import io.zeebe.protocol.clientapi.ControlMessageRequestEncoder;
import io.zeebe.protocol.clientapi.ErrorCode;
import io.zeebe.protocol.clientapi.ErrorResponseDecoder;
import io.zeebe.protocol.clientapi.ExecuteCommandRequestEncoder;
import io.zeebe.protocol.clientapi.MessageHeaderEncoder;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.value.job.JobRecord;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.protocol.intent.MessageIntent;
import io.zeebe.raft.state.RaftState;
import io.zeebe.servicecontainer.testing.ServiceContainerRule;
import io.zeebe.test.util.TestUtil;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.transport.SocketAddress;
import io.zeebe.transport.impl.RemoteAddressImpl;
import io.zeebe.util.VarDataUtil;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.sched.testing.ActorSchedulerRule;
import java.util.concurrent.ExecutionException;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:io/zeebe/broker/transport/clientapi/ClientApiMessageHandlerTest.class */
public class ClientApiMessageHandlerTest {
    private static final int REQUEST_ID = 5;
    private static final int RAFT_TERM = 10;
    protected static final RemoteAddress DEFAULT_ADDRESS = new RemoteAddressImpl(21, new SocketAddress(TypedStreamProcessorTest.STREAM_NAME, 4242));
    protected static final int LOG_STREAM_PARTITION_ID = 1;
    protected static final byte[] JOB_EVENT;
    private LogStream logStream;
    private ClientApiMessageHandler messageHandler;

    @Mock
    private Dispatcher mockControlMessageDispatcher;
    protected BufferingServerOutput serverOutput;
    protected final UnsafeBuffer buffer = new UnsafeBuffer(new byte[1048576]);
    protected final UnsafeBuffer sendBuffer = new UnsafeBuffer(new byte[1048576]);
    protected final MessageHeaderEncoder headerEncoder = new MessageHeaderEncoder();
    protected final ExecuteCommandRequestEncoder commandRequestEncoder = new ExecuteCommandRequestEncoder();
    protected final ControlMessageRequestEncoder controlRequestEncoder = new ControlMessageRequestEncoder();
    protected final ControlMessageRequestDecoder controlRequestDecoder = new ControlMessageRequestDecoder();
    protected final ControlMessageRequestHeaderDescriptor controlMessageRequestHeaderDescriptor = new ControlMessageRequestHeaderDescriptor();
    int fragmentOffset = 0;
    public TemporaryFolder tempFolder = new TemporaryFolder();
    public ActorSchedulerRule agentRunnerService = new ActorSchedulerRule();
    public ServiceContainerRule serviceContainerRule = new ServiceContainerRule(this.agentRunnerService);

    @Rule
    public RuleChain ruleChain = RuleChain.outerRule(this.tempFolder).around(this.agentRunnerService).around(this.serviceContainerRule);

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        this.serverOutput = new BufferingServerOutput();
        this.logStream = (LogStream) LogStreams.createFsLogStream(1).logRootPath(this.tempFolder.getRoot().getAbsolutePath()).serviceContainer(this.serviceContainerRule.get()).logName("Test").build().join();
        this.logStream.openAppender().join();
        this.messageHandler = new ClientApiMessageHandler(this.mockControlMessageDispatcher);
        this.messageHandler.addPartition(new Partition(new PartitionInfo(1, 1), RaftState.LEADER) { // from class: io.zeebe.broker.transport.clientapi.ClientApiMessageHandlerTest.1
            public LogStream getLogStream() {
                return ClientApiMessageHandlerTest.this.logStream;
            }
        });
        this.logStream.setTerm(RAFT_TERM);
    }

    @After
    public void cleanUp() {
        this.logStream.close();
    }

    @Test
    public void shouldHandleCommandRequest() throws InterruptedException, ExecutionException {
        Assertions.assertThat(this.messageHandler.onRequest(this.serverOutput, DEFAULT_ADDRESS, this.buffer, 0, writeCommandRequestToBuffer(this.buffer, 1, null, ValueType.JOB, JobIntent.CREATE), 5L)).isTrue();
        BufferedLogStreamReader bufferedLogStreamReader = new BufferedLogStreamReader(this.logStream, true);
        waitForAvailableEvent(bufferedLogStreamReader);
        LoggedEvent next = bufferedLogStreamReader.next();
        byte[] bArr = new byte[JOB_EVENT.length];
        next.getValueBuffer().getBytes(next.getValueOffset(), bArr, 0, next.getValueLength());
        Assertions.assertThat(next.getValueLength()).isEqualTo(JOB_EVENT.length);
        Assertions.assertThat(bArr).isEqualTo(JOB_EVENT);
        RecordMetadata recordMetadata = new RecordMetadata();
        next.readMetadata(recordMetadata);
        Assertions.assertThat(recordMetadata.getRequestId()).isEqualTo(5L);
        Assertions.assertThat(recordMetadata.getRequestStreamId()).isEqualTo(DEFAULT_ADDRESS.getStreamId());
    }

    @Test
    public void shouldWriteCommandRequestProtocolVersion() throws InterruptedException, ExecutionException {
        Assertions.assertThat(this.messageHandler.onRequest(this.serverOutput, DEFAULT_ADDRESS, this.buffer, 0, writeCommandRequestToBuffer(this.buffer, 1, (short) 0, ValueType.JOB, JobIntent.CREATE), 123L)).isTrue();
        BufferedLogStreamReader bufferedLogStreamReader = new BufferedLogStreamReader(this.logStream, true);
        waitForAvailableEvent(bufferedLogStreamReader);
        LoggedEvent next = bufferedLogStreamReader.next();
        RecordMetadata recordMetadata = new RecordMetadata();
        next.readMetadata(recordMetadata);
        Assertions.assertThat(recordMetadata.getProtocolVersion()).isEqualTo(0);
    }

    @Test
    public void shouldWriteCommandRequestEventType() throws InterruptedException, ExecutionException {
        Assertions.assertThat(this.messageHandler.onRequest(this.serverOutput, DEFAULT_ADDRESS, this.buffer, 0, writeCommandRequestToBuffer(this.buffer, 1, null, ValueType.JOB, JobIntent.CREATE), 123L)).isTrue();
        BufferedLogStreamReader bufferedLogStreamReader = new BufferedLogStreamReader(this.logStream, true);
        waitForAvailableEvent(bufferedLogStreamReader);
        LoggedEvent next = bufferedLogStreamReader.next();
        RecordMetadata recordMetadata = new RecordMetadata();
        next.readMetadata(recordMetadata);
        Assertions.assertThat(recordMetadata.getValueType()).isEqualTo(ValueType.JOB);
        Assertions.assertThat(recordMetadata.getIntent()).isEqualTo(JobIntent.CREATE);
    }

    @Test
    public void shouldHandleControlRequest() {
        int writeControlRequestToBuffer = writeControlRequestToBuffer(this.buffer);
        Mockito.when(Long.valueOf(this.mockControlMessageDispatcher.claim((ClaimedFragment) ArgumentMatchers.any(ClaimedFragment.class), ArgumentMatchers.anyInt()))).thenAnswer(claimFragment(0L));
        Assertions.assertThat(this.messageHandler.onRequest(this.serverOutput, DEFAULT_ADDRESS, this.buffer, 0, writeControlRequestToBuffer, 5L)).isTrue();
        ((Dispatcher) Mockito.verify(this.mockControlMessageDispatcher)).claim((ClaimedFragment) ArgumentMatchers.any(ClaimedFragment.class), ArgumentMatchers.anyInt());
        int i = this.fragmentOffset;
        this.controlMessageRequestHeaderDescriptor.wrap(this.sendBuffer, i);
        Assertions.assertThat(this.controlMessageRequestHeaderDescriptor.streamId()).isEqualTo(DEFAULT_ADDRESS.getStreamId());
        Assertions.assertThat(this.controlMessageRequestHeaderDescriptor.requestId()).isEqualTo(5L);
        int headerLength = i + ControlMessageRequestHeaderDescriptor.headerLength();
        this.headerEncoder.wrap(this.sendBuffer, headerLength);
        this.controlRequestDecoder.wrap(this.sendBuffer, headerLength + this.headerEncoder.encodedLength(), this.controlRequestDecoder.sbeBlockLength(), this.controlRequestDecoder.sbeSchemaVersion());
        ControlMessageRequestDecoder controlMessageRequestDecoder = this.controlRequestDecoder;
        controlMessageRequestDecoder.getClass();
        VarDataUtil.VarDataReader varDataReader = controlMessageRequestDecoder::getData;
        ControlMessageRequestDecoder controlMessageRequestDecoder2 = this.controlRequestDecoder;
        controlMessageRequestDecoder2.getClass();
        Assertions.assertThat(VarDataUtil.readBytes(varDataReader, controlMessageRequestDecoder2::dataLength)).isEqualTo(JOB_EVENT);
    }

    @Test
    public void shouldSendErrorMessageIfPartitionNotFound() {
        Assertions.assertThat(this.messageHandler.onRequest(this.serverOutput, DEFAULT_ADDRESS, this.buffer, 0, writeCommandRequestToBuffer(this.buffer, 99, null, ValueType.JOB, JobIntent.CREATE), 5L)).isTrue();
        Assertions.assertThat(this.serverOutput.getSentResponses()).hasSize(1);
        ErrorResponseDecoder asErrorResponse = this.serverOutput.getAsErrorResponse(0);
        Assertions.assertThat(asErrorResponse.errorCode()).isEqualTo(ErrorCode.PARTITION_NOT_FOUND);
        Assertions.assertThat(asErrorResponse.errorData()).isEqualTo("Cannot execute command. Partition with id '99' not found");
    }

    @Test
    public void shouldNotHandleUnkownRequest() throws InterruptedException, ExecutionException {
        this.headerEncoder.wrap(this.buffer, 0).blockLength(this.commandRequestEncoder.sbeBlockLength()).schemaId(this.commandRequestEncoder.sbeSchemaId()).templateId(999).version(1);
        Assertions.assertThat(this.messageHandler.onRequest(this.serverOutput, DEFAULT_ADDRESS, this.buffer, 0, this.headerEncoder.encodedLength(), 5L)).isTrue();
        Assertions.assertThat(this.serverOutput.getSentResponses()).hasSize(1);
        ErrorResponseDecoder asErrorResponse = this.serverOutput.getAsErrorResponse(0);
        Assertions.assertThat(asErrorResponse.errorCode()).isEqualTo(ErrorCode.MESSAGE_NOT_SUPPORTED);
        Assertions.assertThat(asErrorResponse.errorData()).isEqualTo("Cannot handle message. Template id '999' is not supported.");
    }

    @Test
    public void shouldSendErrorMessageOnRequestWithNewerProtocolVersion() {
        Assertions.assertThat(this.messageHandler.onRequest(this.serverOutput, DEFAULT_ADDRESS, this.buffer, 0, writeCommandRequestToBuffer(this.buffer, 1, Short.MAX_VALUE, ValueType.JOB, JobIntent.CREATE), 5L)).isTrue();
        Assertions.assertThat(this.serverOutput.getSentResponses()).hasSize(1);
        ErrorResponseDecoder asErrorResponse = this.serverOutput.getAsErrorResponse(0);
        Assertions.assertThat(asErrorResponse.errorCode()).isEqualTo(ErrorCode.INVALID_CLIENT_VERSION);
        Assertions.assertThat(asErrorResponse.errorData()).isEqualTo(String.format("Client has newer version than broker (%d > %d)", 32767, 1));
    }

    @Test
    public void shouldSendErrorMessageOnInvalidRequest() throws InterruptedException, ExecutionException {
        Assertions.assertThat(this.messageHandler.onRequest(this.serverOutput, DEFAULT_ADDRESS, this.buffer, 0, writeCommandRequestToBuffer(this.buffer, 1, null, ValueType.MESSAGE, MessageIntent.PUBLISH), 5L)).isTrue();
        Assertions.assertThat(this.serverOutput.getSentResponses()).hasSize(1);
        ErrorResponseDecoder asErrorResponse = this.serverOutput.getAsErrorResponse(0);
        Assertions.assertThat(asErrorResponse.errorCode()).isEqualTo(ErrorCode.INVALID_MESSAGE);
        Assertions.assertThat(asErrorResponse.errorData()).contains(new CharSequence[]{"Cannot deserialize command:"}).contains(new CharSequence[]{"Property 'name' has no valid value"});
    }

    @Test
    public void shouldSendErrorMessageOnUnsupportedRequest() throws InterruptedException, ExecutionException {
        Assertions.assertThat(this.messageHandler.onRequest(this.serverOutput, DEFAULT_ADDRESS, this.buffer, 0, writeCommandRequestToBuffer(this.buffer, 1, null, ValueType.SBE_UNKNOWN, Intent.UNKNOWN), 5L)).isTrue();
        Assertions.assertThat(this.serverOutput.getSentResponses()).hasSize(1);
        ErrorResponseDecoder asErrorResponse = this.serverOutput.getAsErrorResponse(0);
        Assertions.assertThat(asErrorResponse.errorCode()).isEqualTo(ErrorCode.MESSAGE_NOT_SUPPORTED);
        Assertions.assertThat(asErrorResponse.errorData()).isEqualTo("Cannot execute command. Invalid event type 'NULL_VAL'.");
    }

    protected int writeCommandRequestToBuffer(UnsafeBuffer unsafeBuffer, int i, Short sh, ValueType valueType, Intent intent) {
        int shortValue = sh != null ? sh.shortValue() : this.commandRequestEncoder.sbeSchemaVersion();
        ValueType valueType2 = valueType != null ? valueType : ValueType.NULL_VAL;
        this.headerEncoder.wrap(unsafeBuffer, 0).blockLength(this.commandRequestEncoder.sbeBlockLength()).schemaId(this.commandRequestEncoder.sbeSchemaId()).templateId(this.commandRequestEncoder.sbeTemplateId()).version(shortValue);
        this.commandRequestEncoder.wrap(unsafeBuffer, 0 + this.headerEncoder.encodedLength());
        this.commandRequestEncoder.partitionId(i).valueType(valueType2).intent(intent.value()).putValue(JOB_EVENT, 0, JOB_EVENT.length);
        return this.headerEncoder.encodedLength() + this.commandRequestEncoder.encodedLength();
    }

    private int writeControlRequestToBuffer(UnsafeBuffer unsafeBuffer) {
        this.headerEncoder.wrap(unsafeBuffer, 0).blockLength(this.controlRequestEncoder.sbeBlockLength()).schemaId(this.controlRequestEncoder.sbeSchemaId()).templateId(this.controlRequestEncoder.sbeTemplateId()).version(this.controlRequestEncoder.sbeSchemaVersion());
        this.controlRequestEncoder.wrap(unsafeBuffer, 0 + this.headerEncoder.encodedLength());
        this.controlRequestEncoder.putData(JOB_EVENT, 0, JOB_EVENT.length);
        return this.headerEncoder.encodedLength() + this.controlRequestEncoder.encodedLength();
    }

    protected Answer<?> claimFragment(long j) {
        return invocationOnMock -> {
            ClaimedFragment claimedFragment = (ClaimedFragment) invocationOnMock.getArguments()[0];
            int intValue = ((Integer) invocationOnMock.getArguments()[1]).intValue();
            this.fragmentOffset = claimedFragment.getOffset();
            claimedFragment.wrap(this.sendBuffer, 0, DataFrameDescriptor.alignedFramedLength(intValue), () -> {
            });
            return Long.valueOf(j + DataFrameDescriptor.alignedFramedLength(intValue));
        };
    }

    protected void waitForAvailableEvent(BufferedLogStreamReader bufferedLogStreamReader) {
        TestUtil.waitUntil(() -> {
            return bufferedLogStreamReader.hasNext();
        });
    }

    static {
        JobRecord type = new JobRecord().setType(BufferUtil.wrapString(ConfigurationTest.BROKER_BASE));
        UnsafeBuffer unsafeBuffer = new UnsafeBuffer(new byte[type.getEncodedLength()]);
        type.write(unsafeBuffer, 0);
        JOB_EVENT = unsafeBuffer.byteArray();
    }
}
