package io.zeebe.broker.transport.commandapi;

import com.netflix.concurrency.limits.limit.SettableLimit;
import io.zeebe.broker.system.ConfigurationTest;
import io.zeebe.broker.transport.backpressure.CommandRateLimiter;
import io.zeebe.broker.transport.backpressure.NoopRequestLimiter;
import io.zeebe.broker.transport.backpressure.RequestLimiter;
import io.zeebe.distributedlog.DistributedLogstreamService;
import io.zeebe.distributedlog.impl.DefaultDistributedLogstreamService;
import io.zeebe.distributedlog.impl.DistributedLogstreamPartition;
import io.zeebe.logstreams.LogStreams;
import io.zeebe.logstreams.impl.service.LogStreamServiceNames;
import io.zeebe.logstreams.log.BufferedLogStreamReader;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.value.job.JobRecord;
import io.zeebe.protocol.record.ErrorCode;
import io.zeebe.protocol.record.ExecuteCommandRequestEncoder;
import io.zeebe.protocol.record.MessageHeaderEncoder;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.intent.Intent;
import io.zeebe.protocol.record.intent.JobIntent;
import io.zeebe.protocol.record.intent.MessageIntent;
import io.zeebe.servicecontainer.testing.ServiceContainerRule;
import io.zeebe.test.util.TestUtil;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.transport.SocketAddress;
import io.zeebe.transport.impl.RemoteAddressImpl;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.sched.testing.ActorSchedulerRule;
import java.util.concurrent.CompletableFuture;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.internal.util.reflection.FieldSetter;

/* loaded from: input_file:io/zeebe/broker/transport/commandapi/CommandApiMessageHandlerTest.class */
public class CommandApiMessageHandlerTest {
    protected static final RemoteAddress DEFAULT_ADDRESS = new RemoteAddressImpl(21, new SocketAddress("foo", 4242));
    protected static final int LOG_STREAM_PARTITION_ID = 1;
    protected static final byte[] JOB_EVENT;
    private static final int REQUEST_ID = 5;
    protected BufferingServerOutput serverOutput;
    private LogStream logStream;
    private CommandApiMessageHandler messageHandler;
    private DistributedLogstreamService distributedLogImpl;
    protected final UnsafeBuffer buffer = new UnsafeBuffer(new byte[1048576]);
    protected final MessageHeaderEncoder headerEncoder = new MessageHeaderEncoder();
    protected final ExecuteCommandRequestEncoder commandRequestEncoder = new ExecuteCommandRequestEncoder();
    public TemporaryFolder tempFolder = new TemporaryFolder();
    public ActorSchedulerRule agentRunnerService = new ActorSchedulerRule();
    public ServiceContainerRule serviceContainerRule = new ServiceContainerRule(this.agentRunnerService);

    @Rule
    public RuleChain ruleChain = RuleChain.outerRule(this.tempFolder).around(this.agentRunnerService).around(this.serviceContainerRule);
    private RequestLimiter noneLimiter = new NoopRequestLimiter();

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        this.serverOutput = new BufferingServerOutput();
        this.logStream = (LogStream) LogStreams.createFsLogStream(LOG_STREAM_PARTITION_ID).logRootPath(this.tempFolder.getRoot().getAbsolutePath()).serviceContainer(this.serviceContainerRule.get()).logName(ConfigurationTest.BROKER_BASE).build().join();
        DistributedLogstreamPartition distributedLogstreamPartition = (DistributedLogstreamPartition) Mockito.mock(DistributedLogstreamPartition.class);
        this.distributedLogImpl = new DefaultDistributedLogstreamService();
        try {
            FieldSetter.setField(this.distributedLogImpl, DefaultDistributedLogstreamService.class.getDeclaredField("logStream"), this.logStream);
            FieldSetter.setField(this.distributedLogImpl, DefaultDistributedLogstreamService.class.getDeclaredField("logStorage"), this.logStream.getLogStorage());
            FieldSetter.setField(this.distributedLogImpl, DefaultDistributedLogstreamService.class.getDeclaredField("currentLeader"), "0");
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        }
        ((DistributedLogstreamPartition) Mockito.doAnswer(invocationOnMock -> {
            Object[] arguments = invocationOnMock.getArguments();
            if (arguments == null || arguments.length <= LOG_STREAM_PARTITION_ID || arguments[0] == null || arguments[LOG_STREAM_PARTITION_ID] == null) {
                return null;
            }
            byte[] bArr = (byte[]) arguments[0];
            return CompletableFuture.completedFuture(Long.valueOf(this.distributedLogImpl.append("0", ((Long) arguments[LOG_STREAM_PARTITION_ID]).longValue(), bArr)));
        }).when(distributedLogstreamPartition)).asyncAppend((byte[]) ArgumentMatchers.any(byte[].class), ArgumentMatchers.anyLong());
        this.serviceContainerRule.get().createService(LogStreamServiceNames.distributedLogPartitionServiceName(ConfigurationTest.BROKER_BASE), () -> {
            return distributedLogstreamPartition;
        }).install().join();
        this.logStream.openAppender().join();
        this.messageHandler = new CommandApiMessageHandler();
        this.messageHandler.addPartition(this.logStream, this.noneLimiter);
    }

    @After
    public void cleanUp() {
        this.logStream.close();
    }

    @Test
    public void shouldWriteCommandRequestProtocolVersion() {
        Assertions.assertThat(this.messageHandler.onRequest(this.serverOutput, DEFAULT_ADDRESS, this.buffer, 0, writeCommandRequestToBuffer(this.buffer, LOG_STREAM_PARTITION_ID, (short) 0, ValueType.JOB, JobIntent.CREATE), 123L)).isTrue();
        BufferedLogStreamReader bufferedLogStreamReader = new BufferedLogStreamReader(this.logStream);
        waitForAvailableEvent(bufferedLogStreamReader);
        LoggedEvent next = bufferedLogStreamReader.next();
        RecordMetadata recordMetadata = new RecordMetadata();
        next.readMetadata(recordMetadata);
        Assertions.assertThat(recordMetadata.getProtocolVersion()).isEqualTo(0);
    }

    @Test
    public void shouldSendErrorMessageIfPartitionNotFound() {
        Assertions.assertThat(this.messageHandler.onRequest(this.serverOutput, DEFAULT_ADDRESS, this.buffer, 0, writeCommandRequestToBuffer(this.buffer, 99, null, ValueType.JOB, JobIntent.CREATE), 5L)).isTrue();
        Assertions.assertThat(this.serverOutput.getSentResponses()).hasSize(LOG_STREAM_PARTITION_ID);
        Assertions.assertThat(this.serverOutput.getAsErrorResponse(0).errorCode()).isEqualTo(ErrorCode.PARTITION_LEADER_MISMATCH);
    }

    @Test
    public void shouldNotHandleUnknownRequest() {
        this.headerEncoder.wrap(this.buffer, 0).blockLength(this.commandRequestEncoder.sbeBlockLength()).schemaId(this.commandRequestEncoder.sbeSchemaId()).templateId(999).version(LOG_STREAM_PARTITION_ID);
        Assertions.assertThat(this.messageHandler.onRequest(this.serverOutput, DEFAULT_ADDRESS, this.buffer, 0, this.headerEncoder.encodedLength(), 5L)).isTrue();
        Assertions.assertThat(this.serverOutput.getSentResponses()).hasSize(LOG_STREAM_PARTITION_ID);
        Assertions.assertThat(this.serverOutput.getAsErrorResponse(0).errorCode()).isEqualTo(ErrorCode.INVALID_MESSAGE_TEMPLATE);
    }

    @Test
    public void shouldSendErrorMessageOnRequestWithNewerProtocolVersion() {
        Assertions.assertThat(this.messageHandler.onRequest(this.serverOutput, DEFAULT_ADDRESS, this.buffer, 0, writeCommandRequestToBuffer(this.buffer, LOG_STREAM_PARTITION_ID, Short.MAX_VALUE, ValueType.JOB, JobIntent.CREATE), 5L)).isTrue();
        Assertions.assertThat(this.serverOutput.getSentResponses()).hasSize(LOG_STREAM_PARTITION_ID);
        Assertions.assertThat(this.serverOutput.getAsErrorResponse(0).errorCode()).isEqualTo(ErrorCode.INVALID_CLIENT_VERSION);
    }

    @Test
    public void shouldSendErrorMessageOnInvalidRequest() {
        Assertions.assertThat(this.messageHandler.onRequest(this.serverOutput, DEFAULT_ADDRESS, this.buffer, 0, writeCommandRequestToBuffer(this.buffer, LOG_STREAM_PARTITION_ID, null, ValueType.MESSAGE, MessageIntent.PUBLISH), 5L)).isTrue();
        Assertions.assertThat(this.serverOutput.getSentResponses()).hasSize(LOG_STREAM_PARTITION_ID);
        Assertions.assertThat(this.serverOutput.getAsErrorResponse(0).errorCode()).isEqualTo(ErrorCode.MALFORMED_REQUEST);
    }

    @Test
    public void shouldSendErrorMessageOnUnsupportedRequest() {
        Assertions.assertThat(this.messageHandler.onRequest(this.serverOutput, DEFAULT_ADDRESS, this.buffer, 0, writeCommandRequestToBuffer(this.buffer, LOG_STREAM_PARTITION_ID, null, ValueType.SBE_UNKNOWN, Intent.UNKNOWN), 5L)).isTrue();
        Assertions.assertThat(this.serverOutput.getSentResponses()).hasSize(LOG_STREAM_PARTITION_ID);
        Assertions.assertThat(this.serverOutput.getAsErrorResponse(0).errorCode()).isEqualTo(ErrorCode.UNSUPPORTED_MESSAGE);
    }

    @Test
    public void shouldSendErrorMessageOnRequestLimitReached() {
        CommandRateLimiter build = CommandRateLimiter.builder().limit(new SettableLimit(LOG_STREAM_PARTITION_ID)).build(this.logStream.getPartitionId());
        this.messageHandler = new CommandApiMessageHandler();
        this.messageHandler.addPartition(this.logStream, build);
        build.tryAcquire(0, 1L, (Intent) null);
        Assertions.assertThat(this.messageHandler.onRequest(this.serverOutput, DEFAULT_ADDRESS, this.buffer, 0, writeCommandRequestToBuffer(this.buffer, LOG_STREAM_PARTITION_ID, null, ValueType.JOB, JobIntent.CREATE), 5L)).isTrue();
        Assertions.assertThat(this.serverOutput.getSentResponses()).hasSize(LOG_STREAM_PARTITION_ID);
        Assertions.assertThat(this.serverOutput.getAsErrorResponse(0).errorCode()).isEqualTo(ErrorCode.RESOURCE_EXHAUSTED);
    }

    @Test
    public void shouldNotSendErrorMessageOnRequestLimitReachedIfJobComplete() {
        CommandRateLimiter build = CommandRateLimiter.builder().limit(new SettableLimit(LOG_STREAM_PARTITION_ID)).build(this.logStream.getPartitionId());
        this.messageHandler = new CommandApiMessageHandler();
        this.messageHandler.addPartition(this.logStream, build);
        build.tryAcquire(0, 1L, (Intent) null);
        Assertions.assertThat(this.messageHandler.onRequest(this.serverOutput, DEFAULT_ADDRESS, this.buffer, 0, writeCommandRequestToBuffer(this.buffer, LOG_STREAM_PARTITION_ID, null, ValueType.JOB, JobIntent.COMPLETE), 5L)).isTrue();
        Assertions.assertThat(this.serverOutput.getSentResponses()).hasSize(0);
    }

    protected int writeCommandRequestToBuffer(UnsafeBuffer unsafeBuffer, int i, Short sh, ValueType valueType, Intent intent) {
        int shortValue = sh != null ? sh.shortValue() : this.commandRequestEncoder.sbeSchemaVersion();
        ValueType valueType2 = valueType != null ? valueType : ValueType.NULL_VAL;
        this.headerEncoder.wrap(unsafeBuffer, 0).blockLength(this.commandRequestEncoder.sbeBlockLength()).schemaId(this.commandRequestEncoder.sbeSchemaId()).templateId(this.commandRequestEncoder.sbeTemplateId()).version(shortValue);
        this.commandRequestEncoder.wrap(unsafeBuffer, 0 + this.headerEncoder.encodedLength());
        this.commandRequestEncoder.partitionId(i).valueType(valueType2).intent(intent.value()).putValue(JOB_EVENT, 0, JOB_EVENT.length);
        return this.headerEncoder.encodedLength() + this.commandRequestEncoder.encodedLength();
    }

    protected void waitForAvailableEvent(BufferedLogStreamReader bufferedLogStreamReader) {
        TestUtil.waitUntil(() -> {
            return bufferedLogStreamReader.hasNext();
        });
    }

    static {
        JobRecord type = new JobRecord().setType(BufferUtil.wrapString(ConfigurationTest.BROKER_BASE));
        UnsafeBuffer unsafeBuffer = new UnsafeBuffer(new byte[type.getEncodedLength()]);
        type.write(unsafeBuffer, 0);
        JOB_EVENT = unsafeBuffer.byteArray();
    }
}
