package org.apache.pulsar.broker.service;

import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.net.ssl.SSLSession;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.AuthMethod;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
import org.apache.pulsar.common.api.proto.CommandSendError;
import org.apache.pulsar.common.api.proto.CommandSendReceipt;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSuccess;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.MockZooKeeper;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/ServerCnxTest.class */
public class ServerCnxTest {
    protected EmbeddedChannel channel;
    private ServiceConfiguration svcConfig;
    private ServerCnx serverCnx;
    protected BrokerService brokerService;
    private ManagedLedgerFactory mlFactoryMock;
    private ClientChannelHelper clientChannelHelper;
    private PulsarService pulsar;
    private MetadataStoreExtended store;
    private ConfigurationCacheService configCacheService;
    private NamespaceResources namespaceResources;
    protected NamespaceService namespaceService;
    private final int currentProtocolVersion = ProtocolVersion.values()[ProtocolVersion.values().length - 1].getValue();
    protected final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
    private final String failTopicName = "persistent://prop/use/ns-abc/failTopic";
    private final String nonOwnedTopicName = "persistent://prop/use/ns-abc/success-not-owned-topic";
    private final String encryptionRequiredTopicName = "persistent://prop/use/ns-abc/successEncryptionRequiredTopic";
    private final String successSubName = "successSub";
    private final String nonExistentTopicName = "persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic";
    private final String topicWithNonLocalCluster = "persistent://prop/usw/ns-abc/successTopic";
    private final ManagedLedger ledgerMock = (ManagedLedger) Mockito.mock(ManagedLedger.class);
    private final ManagedCursor cursorMock = (ManagedCursor) Mockito.mock(ManagedCursor.class);
    private OrderedExecutor executor;
    private EventLoopGroup eventLoopGroup;

    @BeforeMethod
    public void setup() throws Exception {
        this.eventLoopGroup = new NioEventLoopGroup();
        this.executor = OrderedExecutor.newBuilder().numThreads(1).build();
        this.svcConfig = (ServiceConfiguration) Mockito.spy(new ServiceConfiguration());
        this.svcConfig.setBrokerShutdownTimeoutMs(0L);
        this.pulsar = (PulsarService) Mockito.spy(new PulsarService(this.svcConfig));
        ((PulsarService) Mockito.doReturn(new DefaultSchemaRegistryService()).when(this.pulsar)).getSchemaRegistryService();
        this.svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS));
        this.svcConfig.setBacklogQuotaCheckEnabled(false);
        ((PulsarService) Mockito.doReturn(this.svcConfig).when(this.pulsar)).getConfiguration();
        ((PulsarService) Mockito.doReturn(Mockito.mock(PulsarResources.class)).when(this.pulsar)).getPulsarResources();
        ((ServiceConfiguration) Mockito.doReturn("use").when(this.svcConfig)).getClusterName();
        this.mlFactoryMock = (ManagedLedgerFactory) Mockito.mock(ManagedLedgerFactory.class);
        ((PulsarService) Mockito.doReturn(this.mlFactoryMock).when(this.pulsar)).getManagedLedgerFactory();
        ZooKeeperCache zooKeeperCache = (ZooKeeperCache) Mockito.mock(ZooKeeperCache.class);
        ((ZooKeeperCache) Mockito.doReturn(30).when(zooKeeperCache)).getZkOperationTimeoutSeconds();
        ((PulsarService) Mockito.doReturn(zooKeeperCache).when(this.pulsar)).getLocalZkCache();
        MockZooKeeper createMockZooKeeper = MockedPulsarServiceBaseTest.createMockZooKeeper();
        ((PulsarService) Mockito.doReturn(createMockZooKeeper).when(this.pulsar)).getZkClient();
        ((PulsarService) Mockito.doReturn(MockedPulsarServiceBaseTest.createMockBookKeeper(this.executor)).when(this.pulsar)).getBookKeeperClient();
        this.store = new ZKMetadataStore(createMockZooKeeper);
        this.configCacheService = (ConfigurationCacheService) Mockito.mock(ConfigurationCacheService.class);
        ZooKeeperDataCache zooKeeperDataCache = (ZooKeeperDataCache) Mockito.mock(ZooKeeperDataCache.class);
        ((ZooKeeperDataCache) Mockito.doReturn(Optional.empty()).when(zooKeeperDataCache)).get((String) ArgumentMatchers.any());
        ((ConfigurationCacheService) Mockito.doReturn(zooKeeperDataCache).when(this.configCacheService)).policiesCache();
        ((PulsarService) Mockito.doReturn(this.configCacheService).when(this.pulsar)).getConfigurationCache();
        LocalZooKeeperCacheService localZooKeeperCacheService = (LocalZooKeeperCacheService) Mockito.mock(LocalZooKeeperCacheService.class);
        ((ZooKeeperDataCache) Mockito.doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zooKeeperDataCache)).getAsync((String) ArgumentMatchers.any());
        ((LocalZooKeeperCacheService) Mockito.doReturn(zooKeeperDataCache).when(localZooKeeperCacheService)).policiesCache();
        ((PulsarService) Mockito.doReturn(this.configCacheService).when(this.pulsar)).getConfigurationCache();
        ((PulsarService) Mockito.doReturn(localZooKeeperCacheService).when(this.pulsar)).getLocalZkCacheService();
        ((PulsarService) Mockito.doReturn(this.store).when(this.pulsar)).getLocalMetadataStore();
        ((PulsarService) Mockito.doReturn(this.store).when(this.pulsar)).getConfigurationMetadataStore();
        this.brokerService = (BrokerService) Mockito.spy(new BrokerService(this.pulsar, this.eventLoopGroup));
        ((BrokerService) Mockito.doReturn((BrokerInterceptor) Mockito.mock(BrokerInterceptor.class)).when(this.brokerService)).getInterceptor();
        ((PulsarService) Mockito.doReturn(this.brokerService).when(this.pulsar)).getBrokerService();
        ((PulsarService) Mockito.doReturn(this.executor).when(this.pulsar)).getOrderedExecutor();
        PulsarResources pulsarResources = (PulsarResources) Mockito.spy(new PulsarResources(this.store, this.store));
        this.namespaceResources = (NamespaceResources) Mockito.spy(new NamespaceResources(this.store, this.store, 30));
        ((PulsarResources) Mockito.doReturn(this.namespaceResources).when(pulsarResources)).getNamespaceResources();
        ((PulsarService) Mockito.doReturn(pulsarResources).when(this.pulsar)).getPulsarResources();
        this.namespaceService = (NamespaceService) Mockito.mock(NamespaceService.class);
        ((NamespaceService) Mockito.doReturn(CompletableFuture.completedFuture(null)).when(this.namespaceService)).getBundleAsync((TopicName) ArgumentMatchers.any());
        ((PulsarService) Mockito.doReturn(this.namespaceService).when(this.pulsar)).getNamespaceService();
        ((NamespaceService) Mockito.doReturn(true).when(this.namespaceService)).isServiceUnitOwned((ServiceUnitId) ArgumentMatchers.any());
        ((NamespaceService) Mockito.doReturn(true).when(this.namespaceService)).isServiceUnitActive((TopicName) ArgumentMatchers.any());
        ((NamespaceService) Mockito.doReturn(CompletableFuture.completedFuture(true)).when(this.namespaceService)).checkTopicOwnership((TopicName) ArgumentMatchers.any());
        setupMLAsyncCallbackMocks();
        this.clientChannelHelper = new ClientChannelHelper();
    }

    private int inSec(int i, TimeUnit timeUnit) {
        return (int) TimeUnit.SECONDS.convert(i, timeUnit);
    }

    @AfterMethod(alwaysRun = true)
    public void teardown() throws Exception {
        this.serverCnx.close();
        this.channel.close();
        this.pulsar.close();
        this.brokerService.close();
        this.executor.shutdownNow();
        this.eventLoopGroup.shutdownGracefully().get();
        this.store.close();
    }

    @Test(timeOut = 30000)
    public void testConnectCommand() throws Exception {
        resetChannel();
        Assert.assertTrue(this.channel.isActive());
        Assert.assertEquals(this.serverCnx.getState(), ServerCnx.State.Start);
        this.channel.writeInbound(new Object[]{Commands.newConnect("none", "", (String) null)});
        Assert.assertEquals(this.serverCnx.getState(), ServerCnx.State.Connected);
        Assert.assertTrue(getResponse() instanceof CommandConnected);
        this.channel.finish();
    }

    private static ByteBuf newConnect(AuthMethod authMethod, String str, int i) {
        BaseCommand type = new BaseCommand().setType(BaseCommand.Type.CONNECT);
        type.setConnect().setClientVersion("Pulsar Client").setAuthMethod(authMethod).setAuthData(str.getBytes(StandardCharsets.UTF_8)).setProtocolVersion(i);
        return Commands.serializeWithSize(type);
    }

    @Test(timeOut = 30000)
    public void testConnectCommandWithEnum() throws Exception {
        resetChannel();
        Assert.assertTrue(this.channel.isActive());
        Assert.assertEquals(this.serverCnx.getState(), ServerCnx.State.Start);
        this.channel.writeInbound(new Object[]{newConnect(AuthMethod.AuthMethodNone, "", Commands.getCurrentProtocolVersion())});
        Assert.assertEquals(this.serverCnx.getState(), ServerCnx.State.Connected);
        Assert.assertTrue(getResponse() instanceof CommandConnected);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testConnectCommandWithProtocolVersion() throws Exception {
        resetChannel();
        Assert.assertTrue(this.channel.isActive());
        Assert.assertEquals(this.serverCnx.getState(), ServerCnx.State.Start);
        this.channel.writeInbound(new Object[]{Commands.newConnect("none", "", (String) null)});
        Assert.assertEquals(this.serverCnx.getState(), ServerCnx.State.Connected);
        Assert.assertEquals(((CommandConnected) getResponse()).getProtocolVersion(), this.currentProtocolVersion);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testKeepAlive() throws Exception {
        resetChannel();
        Assert.assertTrue(this.channel.isActive());
        Assert.assertEquals(this.serverCnx.getState(), ServerCnx.State.Start);
        this.channel.writeInbound(new Object[]{Commands.newConnect("none", "", (String) null)});
        Assert.assertEquals(this.serverCnx.getState(), ServerCnx.State.Connected);
        Assert.assertEquals(((CommandConnected) getResponse()).getProtocolVersion(), this.currentProtocolVersion);
        for (int i = 0; i < 3; i++) {
            this.channel.runPendingTasks();
            Thread.sleep(1000L);
        }
        Assert.assertFalse(this.channel.isActive());
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testKeepAliveNotEnforcedWithOlderClients() throws Exception {
        resetChannel();
        Assert.assertTrue(this.channel.isActive());
        Assert.assertEquals(this.serverCnx.getState(), ServerCnx.State.Start);
        this.channel.writeInbound(new Object[]{Commands.newConnect("none", "", ProtocolVersion.v0.getValue(), (String) null, (String) null, (String) null, (String) null, (String) null)});
        Assert.assertEquals(this.serverCnx.getState(), ServerCnx.State.Connected);
        Assert.assertEquals(((CommandConnected) getResponse()).getProtocolVersion(), ProtocolVersion.v0.getValue());
        for (int i = 0; i < 3; i++) {
            this.channel.runPendingTasks();
            Thread.sleep(1000L);
        }
        Assert.assertTrue(this.channel.isActive());
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testKeepAliveBeforeHandshake() throws Exception {
        resetChannel();
        Assert.assertTrue(this.channel.isActive());
        Assert.assertEquals(this.serverCnx.getState(), ServerCnx.State.Start);
        for (int i = 0; i < 3; i++) {
            this.channel.runPendingTasks();
            Thread.sleep(1000L);
        }
        Assert.assertFalse(this.channel.isActive());
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testConnectCommandWithAuthenticationPositive() throws Exception {
        AuthenticationService authenticationService = (AuthenticationService) Mockito.mock(AuthenticationService.class);
        AuthenticationProvider authenticationProvider = (AuthenticationProvider) Mockito.mock(AuthenticationProvider.class);
        AuthenticationState authenticationState = (AuthenticationState) Mockito.mock(AuthenticationState.class);
        AuthData of = AuthData.of((byte[]) null);
        ((BrokerService) Mockito.doReturn(authenticationService).when(this.brokerService)).getAuthenticationService();
        ((AuthenticationService) Mockito.doReturn(authenticationProvider).when(authenticationService)).getAuthenticationProvider(Mockito.anyString());
        ((AuthenticationProvider) Mockito.doReturn(authenticationState).when(authenticationProvider)).newAuthState((AuthData) Mockito.any(), (SocketAddress) Mockito.any(), (SSLSession) Mockito.any());
        ((AuthenticationState) Mockito.doReturn(of).when(authenticationState)).authenticate(of);
        ((AuthenticationState) Mockito.doReturn(true).when(authenticationState)).isComplete();
        ((AuthenticationState) Mockito.doReturn("appid1").when(authenticationState)).getAuthRole();
        ((BrokerService) Mockito.doReturn(true).when(this.brokerService)).isAuthenticationEnabled();
        resetChannel();
        Assert.assertTrue(this.channel.isActive());
        Assert.assertEquals(this.serverCnx.getState(), ServerCnx.State.Start);
        this.channel.writeInbound(new Object[]{Commands.newConnect("none", "", (String) null)});
        Assert.assertEquals(this.serverCnx.getState(), ServerCnx.State.Connected);
        Assert.assertTrue(getResponse() instanceof CommandConnected);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testConnectCommandWithAuthenticationNegative() throws Exception {
        AuthenticationService authenticationService = (AuthenticationService) Mockito.mock(AuthenticationService.class);
        ((BrokerService) Mockito.doReturn(authenticationService).when(this.brokerService)).getAuthenticationService();
        ((AuthenticationService) Mockito.doReturn(Optional.empty()).when(authenticationService)).getAnonymousUserRole();
        ((BrokerService) Mockito.doReturn(true).when(this.brokerService)).isAuthenticationEnabled();
        resetChannel();
        Assert.assertTrue(this.channel.isActive());
        Assert.assertEquals(this.serverCnx.getState(), ServerCnx.State.Start);
        this.channel.writeInbound(new Object[]{Commands.newConnect("none", "", (String) null)});
        Assert.assertEquals(this.serverCnx.getState(), ServerCnx.State.Start);
        Assert.assertTrue(getResponse() instanceof CommandError);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testProducerCommand() throws Exception {
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 1L, "prod-name", Collections.emptyMap())});
        Assert.assertTrue(getResponse() instanceof CommandProducerSuccess);
        PersistentTopic persistentTopic = (PersistentTopic) this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successTopic").get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/failTopic", 2L, 2L, "prod-name-2", Collections.emptyMap())});
        Assert.assertTrue(getResponse() instanceof CommandError);
        Assert.assertFalse(this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/failTopic").isPresent());
        this.channel.finish();
        Assert.assertEquals(persistentTopic.getProducers().size(), 0);
    }

    @Test(timeOut = 5000)
    public void testDuplicateConcurrentProducerCommand() throws Exception {
        resetChannel();
        setChannelConnected();
        ((BrokerService) Mockito.doReturn(new CompletableFuture()).when(this.brokerService)).getOrCreateTopic((String) ArgumentMatchers.any(String.class));
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 1L, "prod-name", Collections.emptyMap())});
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 1L, "prod-name", Collections.emptyMap())});
        Object response = getResponse();
        Assert.assertTrue(response instanceof CommandError);
        Assert.assertEquals(((CommandError) response).getError(), ServerError.ServiceNotReady);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testProducerOnNotOwnedTopic() throws Exception {
        resetChannel();
        setChannelConnected();
        ((NamespaceService) Mockito.doReturn(false).when(this.namespaceService)).isServiceUnitActive((TopicName) ArgumentMatchers.any(TopicName.class));
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/success-not-owned-topic", 1L, 1L, "prod-name", Collections.emptyMap())});
        Object response = getResponse();
        Assert.assertEquals(response.getClass(), CommandError.class);
        Assert.assertEquals(((CommandError) response).getError(), ServerError.ServiceNotReady);
        Assert.assertFalse(this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/success-not-owned-topic").isPresent());
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testProducerCommandWithAuthorizationPositive() throws Exception {
        AuthorizationService authorizationService = (AuthorizationService) Mockito.mock(AuthorizationService.class);
        ((AuthorizationService) Mockito.doReturn(CompletableFuture.completedFuture(true)).when(authorizationService)).allowTopicOperationAsync((TopicName) Mockito.any(), (TopicOperation) Mockito.any(), (String) Mockito.any(), (AuthenticationDataSource) Mockito.any());
        ((BrokerService) Mockito.doReturn(authorizationService).when(this.brokerService)).getAuthorizationService();
        ((BrokerService) Mockito.doReturn(true).when(this.brokerService)).isAuthenticationEnabled();
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 1L, "prod-name", Collections.emptyMap())});
        Assert.assertEquals(getResponse().getClass(), CommandProducerSuccess.class);
        PersistentTopic persistentTopic = (PersistentTopic) this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successTopic").get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        this.channel.finish();
        Assert.assertEquals(persistentTopic.getProducers().size(), 0);
    }

    @Test(timeOut = 30000)
    public void testNonExistentTopic() throws Exception {
        ZooKeeperDataCache zooKeeperDataCache = (ZooKeeperDataCache) Mockito.mock(ZooKeeperDataCache.class);
        ConfigurationCacheService configurationCacheService = (ConfigurationCacheService) Mockito.mock(ConfigurationCacheService.class);
        ((PulsarService) Mockito.doReturn(configurationCacheService).when(this.pulsar)).getConfigurationCache();
        ((ConfigurationCacheService) Mockito.doReturn(zooKeeperDataCache).when(configurationCacheService)).policiesCache();
        ((ZooKeeperDataCache) Mockito.doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zooKeeperDataCache)).getAsync(Mockito.matches(".*nonexistent.*"));
        AuthorizationService authorizationService = (AuthorizationService) Mockito.spy(new AuthorizationService(this.svcConfig, this.pulsar.getPulsarResources()));
        ((BrokerService) Mockito.doReturn(authorizationService).when(this.brokerService)).getAuthorizationService();
        ((BrokerService) Mockito.doReturn(true).when(this.brokerService)).isAuthorizationEnabled();
        this.svcConfig.setAuthorizationEnabled(true);
        Field declaredField = AuthorizationService.class.getDeclaredField("provider");
        declaredField.setAccessible(true);
        PulsarAuthorizationProvider pulsarAuthorizationProvider = (PulsarAuthorizationProvider) Mockito.spy(new PulsarAuthorizationProvider(this.svcConfig, this.pulsar.getPulsarResources()));
        declaredField.set(authorizationService, pulsarAuthorizationProvider);
        ((PulsarAuthorizationProvider) Mockito.doReturn(CompletableFuture.completedFuture(false)).when(pulsarAuthorizationProvider)).isSuperUser(Mockito.anyString(), (AuthenticationDataSource) Mockito.any(), (ServiceConfiguration) Mockito.any());
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic", 1L, 1L, "prod-name", Collections.emptyMap())});
        Assert.assertTrue(getResponse() instanceof CommandError);
        this.channel.finish();
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic", "successSub", 1L, 1L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        Assert.assertTrue(getResponse() instanceof CommandError);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testClusterAccess() throws Exception {
        this.svcConfig.setAuthorizationEnabled(true);
        AuthorizationService authorizationService = (AuthorizationService) Mockito.spy(new AuthorizationService(this.svcConfig, this.pulsar.getPulsarResources()));
        Field declaredField = AuthorizationService.class.getDeclaredField("provider");
        declaredField.setAccessible(true);
        PulsarAuthorizationProvider pulsarAuthorizationProvider = (PulsarAuthorizationProvider) Mockito.spy(new PulsarAuthorizationProvider(this.svcConfig, this.pulsar.getPulsarResources()));
        declaredField.set(authorizationService, pulsarAuthorizationProvider);
        ((BrokerService) Mockito.doReturn(authorizationService).when(this.brokerService)).getAuthorizationService();
        ((BrokerService) Mockito.doReturn(true).when(this.brokerService)).isAuthorizationEnabled();
        ((PulsarAuthorizationProvider) Mockito.doReturn(CompletableFuture.completedFuture(false)).when(pulsarAuthorizationProvider)).isSuperUser(Mockito.anyString(), (AuthenticationDataSource) Mockito.any(), (ServiceConfiguration) Mockito.any());
        ((PulsarAuthorizationProvider) Mockito.doReturn(CompletableFuture.completedFuture(false)).when(pulsarAuthorizationProvider)).validateTenantAdminAccess(Mockito.anyString(), (String) Mockito.any(), (AuthenticationDataSource) Mockito.any());
        ((PulsarAuthorizationProvider) Mockito.doReturn(CompletableFuture.completedFuture(true)).when(pulsarAuthorizationProvider)).checkPermission((TopicName) ArgumentMatchers.any(TopicName.class), Mockito.anyString(), (AuthAction) ArgumentMatchers.any(AuthAction.class));
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 1L, "prod-name", Collections.emptyMap())});
        Assert.assertTrue(getResponse() instanceof CommandProducerSuccess);
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/usw/ns-abc/successTopic", 1L, 1L, "prod-name", Collections.emptyMap())});
        Assert.assertTrue(getResponse() instanceof CommandError);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testNonExistentTopicSuperUserAccess() throws Exception {
        AuthorizationService authorizationService = (AuthorizationService) Mockito.spy(new AuthorizationService(this.svcConfig, this.pulsar.getPulsarResources()));
        ((BrokerService) Mockito.doReturn(authorizationService).when(this.brokerService)).getAuthorizationService();
        ((BrokerService) Mockito.doReturn(true).when(this.brokerService)).isAuthorizationEnabled();
        Field declaredField = AuthorizationService.class.getDeclaredField("provider");
        declaredField.setAccessible(true);
        PulsarAuthorizationProvider pulsarAuthorizationProvider = (PulsarAuthorizationProvider) Mockito.spy(new PulsarAuthorizationProvider(this.svcConfig, this.pulsar.getPulsarResources()));
        declaredField.set(authorizationService, pulsarAuthorizationProvider);
        ((PulsarAuthorizationProvider) Mockito.doReturn(CompletableFuture.completedFuture(true)).when(pulsarAuthorizationProvider)).isSuperUser(Mockito.anyString(), (AuthenticationDataSource) Mockito.any(), (ServiceConfiguration) Mockito.any());
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic", 1L, 1L, "prod-name", Collections.emptyMap())});
        Assert.assertTrue(getResponse() instanceof CommandProducerSuccess);
        PersistentTopic persistentTopic = (PersistentTopic) this.brokerService.getTopicReference("persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic").get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        this.channel.finish();
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic", "successSub", 1L, 1L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        PersistentTopic persistentTopic2 = (PersistentTopic) this.brokerService.getTopicReference("persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic").get();
        Assert.assertNotNull(persistentTopic2);
        Assert.assertTrue(persistentTopic2.getSubscriptions().containsKey("successSub"));
        Assert.assertTrue(persistentTopic2.getSubscription("successSub").getDispatcher().isConsumerConnected());
        Assert.assertTrue(getResponse() instanceof CommandSuccess);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testProducerCommandWithAuthorizationNegative() throws Exception {
        AuthorizationService authorizationService = (AuthorizationService) Mockito.mock(AuthorizationService.class);
        ((AuthorizationService) Mockito.doReturn(CompletableFuture.completedFuture(false)).when(authorizationService)).allowTopicOperationAsync((TopicName) Mockito.any(), (TopicOperation) Mockito.any(), (String) Mockito.any(), (AuthenticationDataSource) Mockito.any());
        ((BrokerService) Mockito.doReturn(authorizationService).when(this.brokerService)).getAuthorizationService();
        ((BrokerService) Mockito.doReturn(true).when(this.brokerService)).isAuthenticationEnabled();
        ((BrokerService) Mockito.doReturn(true).when(this.brokerService)).isAuthorizationEnabled();
        ((BrokerService) Mockito.doReturn("prod1").when(this.brokerService)).generateUniqueProducerName();
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 1L, (String) null, Collections.emptyMap())});
        Assert.assertTrue(getResponse() instanceof CommandError);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSendCommand() throws Exception {
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 1L, "prod-name", Collections.emptyMap())});
        Assert.assertTrue(getResponse() instanceof CommandProducerSuccess);
        ByteBuf coalesce = ByteBufPair.coalesce(Commands.newSend(1L, 0L, 1, Commands.ChecksumType.None, new MessageMetadata().setPublishTime(System.currentTimeMillis()).setProducerName("prod-name").setSequenceId(0L), Unpooled.buffer(1024)));
        this.channel.writeInbound(new Object[]{Unpooled.copiedBuffer(coalesce)});
        coalesce.release();
        Assert.assertTrue(getResponse() instanceof CommandSendReceipt);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testUseSameProducerName() throws Exception {
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 1L, "my-producer", Collections.emptyMap())});
        Assert.assertTrue(getResponse() instanceof CommandProducerSuccess);
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 2L, 2L, "my-producer", Collections.emptyMap())});
        Assert.assertTrue(getResponse() instanceof CommandError);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testRecreateSameProducer() throws Exception {
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 1L, "my-producer", Collections.emptyMap())});
        Object response = getResponse();
        Assert.assertEquals(response.getClass(), CommandProducerSuccess.class);
        Assert.assertEquals(((CommandProducerSuccess) response).getRequestId(), 1L);
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 2L, "my-producer", Collections.emptyMap())});
        Object response2 = getResponse();
        Assert.assertEquals(response2.getClass(), CommandProducerSuccess.class);
        Assert.assertEquals(((CommandProducerSuccess) response2).getRequestId(), 2L);
        Assert.assertTrue(this.channel.outboundMessages().isEmpty());
        Assert.assertTrue(this.channel.isActive());
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSubscribeMultipleTimes() throws Exception {
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "successSub", 1L, 1L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        Object response = getResponse();
        Assert.assertEquals(response.getClass(), CommandSuccess.class);
        Assert.assertEquals(((CommandSuccess) response).getRequestId(), 1L);
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "successSub", 1L, 2L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        Object response2 = getResponse();
        Assert.assertEquals(response2.getClass(), CommandSuccess.class);
        Assert.assertEquals(((CommandSuccess) response2).getRequestId(), 2L);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testDuplicateConcurrentSubscribeCommand() throws Exception {
        resetChannel();
        setChannelConnected();
        ((BrokerService) Mockito.doReturn(new CompletableFuture()).when(this.brokerService)).getOrCreateTopic((String) ArgumentMatchers.any(String.class));
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "successSub", 1L, 1L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "successSub", 2L, 1L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        Awaitility.await().untilAsserted(() -> {
            Object response = getResponse();
            Assert.assertTrue(response instanceof CommandError, "Response is not CommandError but " + response);
            Assert.assertEquals(((CommandError) response).getError(), ServerError.ConsumerBusy);
        });
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testCreateProducerTimeout() throws Exception {
        resetChannel();
        setChannelConnected();
        CompletableFuture completableFuture = new CompletableFuture();
        ((ManagedLedgerFactory) Mockito.doAnswer(invocationOnMock -> {
            completableFuture.complete(() -> {
                ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(this.ledgerMock, (Object) null);
            });
            return null;
        }).when(this.mlFactoryMock)).asyncOpen(Mockito.matches(".*success.*"), (ManagedLedgerConfig) ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier) ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 1L, "my-producer", Collections.emptyMap())});
        this.channel.writeInbound(new Object[]{Commands.newCloseProducer(1L, 2L)});
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 3L, "my-producer", Collections.emptyMap())});
        ((Runnable) completableFuture.get()).run();
        Object response = getResponse();
        Assert.assertEquals(response.getClass(), CommandSuccess.class);
        Assert.assertEquals(((CommandSuccess) response).getRequestId(), 2L);
        Object response2 = getResponse();
        Assert.assertEquals(response2.getClass(), CommandProducerSuccess.class);
        Assert.assertEquals(((CommandProducerSuccess) response2).getRequestId(), 3L);
        Assert.assertTrue(this.channel.isActive());
        this.channel.finish();
    }

    @Test(timeOut = 30000, enabled = false)
    public void testCreateProducerMultipleTimeouts() throws Exception {
        resetChannel();
        setChannelConnected();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ((ManagedLedgerFactory) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.ServerCnxTest.1
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                countDownLatch.await();
                ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ServerCnxTest.this.ledgerMock, (Object) null);
                return null;
            }
        }).when(this.mlFactoryMock)).asyncOpen(Mockito.matches(".*success.*"), (ManagedLedgerConfig) ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier) ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 1L, "my-producer", Collections.emptyMap())});
        this.channel.writeInbound(new Object[]{Commands.newCloseProducer(1L, 2L)});
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 3L, "my-producer", Collections.emptyMap())});
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 4L, "my-producer", Collections.emptyMap())});
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 5L, "my-producer", Collections.emptyMap())});
        Object response = getResponse();
        Assert.assertEquals(response.getClass(), CommandSuccess.class);
        Assert.assertEquals(((CommandSuccess) response).getRequestId(), 2L);
        countDownLatch.countDown();
        Object response2 = getResponse();
        Assert.assertEquals(response2.getClass(), CommandError.class);
        Assert.assertEquals(((CommandError) response2).getRequestId(), 3L);
        Object response3 = getResponse();
        Assert.assertEquals(response3.getClass(), CommandError.class);
        Assert.assertEquals(((CommandError) response3).getRequestId(), 4L);
        Object response4 = getResponse();
        Assert.assertEquals(response4.getClass(), CommandError.class);
        Assert.assertEquals(((CommandError) response4).getRequestId(), 5L);
        Thread.sleep(100L);
        Assert.assertTrue(this.channel.outboundMessages().isEmpty());
        Assert.assertTrue(this.channel.isActive());
        this.channel.finish();
    }

    @Test(timeOut = 30000, skipFailedInvocations = true)
    public void testCreateProducerBookieTimeout() throws Exception {
        resetChannel();
        setChannelConnected();
        CompletableFuture completableFuture = new CompletableFuture();
        ((ManagedLedgerFactory) Mockito.doAnswer(invocationOnMock -> {
            completableFuture.complete(() -> {
                ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(this.ledgerMock, (Object) null);
            });
            return null;
        }).when(this.mlFactoryMock)).asyncOpen(Mockito.matches(".*fail.*"), (ManagedLedgerConfig) ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier) ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/failTopic", 1L, 1L, "my-producer", Collections.emptyMap())});
        this.channel.writeInbound(new Object[]{Commands.newCloseProducer(1L, 2L)});
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 3L, "my-producer", Collections.emptyMap())});
        ((Runnable) completableFuture.get()).run();
        Object response = getResponse();
        Assert.assertEquals(response.getClass(), CommandSuccess.class);
        Assert.assertEquals(((CommandSuccess) response).getRequestId(), 2L);
        Object response2 = getResponse();
        Assert.assertEquals(response2.getClass(), CommandProducerSuccess.class);
        Assert.assertEquals(((CommandProducerSuccess) response2).getRequestId(), 3L);
        Thread.sleep(500L);
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 4L, "my-producer", Collections.emptyMap())});
        Object response3 = getResponse();
        Assert.assertEquals(response3.getClass(), CommandProducerSuccess.class);
        Assert.assertEquals(((CommandProducerSuccess) response3).getRequestId(), 4L);
        Thread.sleep(500L);
        Assert.assertTrue(this.channel.outboundMessages().isEmpty());
        Assert.assertTrue(this.channel.isActive());
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSubscribeTimeout() throws Exception {
        resetChannel();
        setChannelConnected();
        CompletableFuture completableFuture = new CompletableFuture();
        ((ManagedLedgerFactory) Mockito.doAnswer(invocationOnMock -> {
            completableFuture.complete(() -> {
                ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(this.ledgerMock, (Object) null);
            });
            return null;
        }).when(this.mlFactoryMock)).asyncOpen(Mockito.matches(".*success.*"), (ManagedLedgerConfig) ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier) ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "successSub", 1L, 1L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "successSub", 1L, 3L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "successSub", 1L, 4L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "successSub", 1L, 5L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        ((Runnable) completableFuture.get()).run();
        synchronized (this) {
            Object response = getResponse();
            Assert.assertEquals(response.getClass(), CommandError.class);
            Assert.assertEquals(((CommandError) response).getRequestId(), 3L);
            Object response2 = getResponse();
            Assert.assertEquals(response2.getClass(), CommandError.class);
            Assert.assertEquals(((CommandError) response2).getRequestId(), 4L);
            Object response3 = getResponse();
            Assert.assertEquals(response3.getClass(), CommandError.class);
            Assert.assertEquals(((CommandError) response3).getRequestId(), 5L);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertFalse(this.channel.outboundMessages().isEmpty());
            });
            Assert.assertTrue(this.channel.isActive());
            Object response4 = getResponse();
            Assert.assertEquals(response4.getClass(), CommandSuccess.class);
            Assert.assertEquals(((CommandSuccess) response4).getRequestId(), 1L);
        }
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSubscribeBookieTimeout() throws Exception {
        resetChannel();
        setChannelConnected();
        CompletableFuture completableFuture = new CompletableFuture();
        ((ManagedLedgerFactory) Mockito.doAnswer(invocationOnMock -> {
            completableFuture.complete(() -> {
                ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(this.ledgerMock, (Object) null);
            });
            return null;
        }).when(this.mlFactoryMock)).asyncOpen(Mockito.matches(".*success.*"), (ManagedLedgerConfig) ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier) ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        CompletableFuture completableFuture2 = new CompletableFuture();
        ((ManagedLedgerFactory) Mockito.doAnswer(invocationOnMock2 -> {
            completableFuture2.complete(() -> {
                ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock2.getArguments()[2]).openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), (Object) null);
            });
            return null;
        }).when(this.mlFactoryMock)).asyncOpen(Mockito.matches(".*fail.*"), (ManagedLedgerConfig) ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier) ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/failTopic", "successSub", 1L, 1L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        this.channel.writeInbound(new Object[]{Commands.newCloseConsumer(1L, 2L)});
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "successSub", 1L, 3L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        ((Runnable) completableFuture2.get()).run();
        Object response = getResponse();
        Assert.assertEquals(response.getClass(), CommandSuccess.class);
        Assert.assertEquals(((CommandSuccess) response).getRequestId(), 2L);
        Object response2 = getResponse();
        Assert.assertEquals(response2.getClass(), CommandError.class);
        Assert.assertEquals(((CommandError) response2).getRequestId(), 3L);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(!this.serverCnx.hasConsumer(1L));
        });
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "successSub", 1L, 4L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        ((Runnable) completableFuture.get()).run();
        Object response3 = getResponse();
        Assert.assertEquals(response3.getClass(), CommandSuccess.class);
        Assert.assertEquals(((CommandSuccess) response3).getRequestId(), 4L);
        Thread.sleep(100L);
        Assert.assertTrue(this.channel.outboundMessages().isEmpty());
        Assert.assertTrue(this.channel.isActive());
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSubscribeCommand() throws Exception {
        resetChannel();
        setChannelConnected();
        ((BrokerService) Mockito.doReturn(false).when(this.brokerService)).isAuthenticationEnabled();
        ((BrokerService) Mockito.doReturn(false).when(this.brokerService)).isAuthorizationEnabled();
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "successSub", 1L, 1L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        Assert.assertTrue(getResponse() instanceof CommandSuccess);
        PersistentTopic persistentTopic = (PersistentTopic) this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successTopic").get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertTrue(persistentTopic.getSubscriptions().containsKey("successSub"));
        Assert.assertTrue(persistentTopic.getSubscription("successSub").getDispatcher().isConsumerConnected());
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "failSub", 2L, 2L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        Assert.assertTrue(getResponse() instanceof CommandError);
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/failTopic", "successSub", 3L, 3L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        Assert.assertEquals(getResponse().getClass(), CommandError.class);
        Assert.assertTrue(this.channel.isOpen());
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testUnsupportedBatchMsgSubscribeCommand() throws Exception {
        resetChannel();
        setChannelConnected();
        setConnectionVersion(ProtocolVersion.v3.getValue());
        ((BrokerService) Mockito.doReturn(false).when(this.brokerService)).isAuthenticationEnabled();
        ((BrokerService) Mockito.doReturn(false).when(this.brokerService)).isAuthorizationEnabled();
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "successSub", 1L, 1L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        Assert.assertTrue(getResponse() instanceof CommandSuccess);
        ((PersistentTopic) this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successTopic").get()).markBatchMessagePublished();
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "failSub", 2L, 2L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        Object response = getResponse();
        Assert.assertTrue(response instanceof CommandError);
        Assert.assertEquals(ServerError.UnsupportedVersionError, ((CommandError) response).getError());
        Assert.assertTrue(this.channel.isOpen());
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSubscribeCommandWithAuthorizationPositive() throws Exception {
        AuthorizationService authorizationService = (AuthorizationService) Mockito.mock(AuthorizationService.class);
        ((AuthorizationService) Mockito.doReturn(CompletableFuture.completedFuture(true)).when(authorizationService)).allowTopicOperationAsync((TopicName) Mockito.any(), (TopicOperation) Mockito.any(), (String) Mockito.any(), (AuthenticationDataSource) Mockito.any());
        ((BrokerService) Mockito.doReturn(authorizationService).when(this.brokerService)).getAuthorizationService();
        ((BrokerService) Mockito.doReturn(true).when(this.brokerService)).isAuthenticationEnabled();
        ((BrokerService) Mockito.doReturn(true).when(this.brokerService)).isAuthorizationEnabled();
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "successSub", 1L, 1L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        Assert.assertTrue(getResponse() instanceof CommandSuccess);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSubscribeCommandWithAuthorizationNegative() throws Exception {
        AuthorizationService authorizationService = (AuthorizationService) Mockito.mock(AuthorizationService.class);
        ((AuthorizationService) Mockito.doReturn(CompletableFuture.completedFuture(false)).when(authorizationService)).allowTopicOperationAsync((TopicName) Mockito.any(), (TopicOperation) Mockito.any(), (String) Mockito.any(), (AuthenticationDataSource) Mockito.any());
        ((BrokerService) Mockito.doReturn(authorizationService).when(this.brokerService)).getAuthorizationService();
        ((BrokerService) Mockito.doReturn(true).when(this.brokerService)).isAuthenticationEnabled();
        ((BrokerService) Mockito.doReturn(true).when(this.brokerService)).isAuthorizationEnabled();
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "successSub", 1L, 1L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        Assert.assertTrue(getResponse() instanceof CommandError);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testAckCommand() throws Exception {
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "successSub", 1L, 1L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        Assert.assertTrue(getResponse() instanceof CommandSuccess);
        PositionImpl positionImpl = new PositionImpl(0L, 0L);
        this.channel.writeInbound(new Object[]{Commands.newAck(1L, positionImpl.getLedgerId(), positionImpl.getEntryId(), (BitSetRecyclable) null, CommandAck.AckType.Individual, (CommandAck.ValidationError) null, Collections.emptyMap(), -1L)});
        Assert.assertNull(this.channel.outboundMessages().peek());
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testFlowCommand() throws Exception {
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "successSub", 1L, 1L, CommandSubscribe.SubType.Exclusive, 0, "test", 0L)});
        Assert.assertTrue(getResponse() instanceof CommandSuccess);
        this.channel.writeInbound(new Object[]{Commands.newFlow(1L, 1)});
        Assert.assertNull(this.channel.outboundMessages().peek());
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testProducerSuccessOnEncryptionRequiredTopic() throws Exception {
        resetChannel();
        setChannelConnected();
        ZooKeeperDataCache zooKeeperDataCache = (ZooKeeperDataCache) Mockito.mock(ZooKeeperDataCache.class);
        Policies policies = (Policies) Mockito.mock(Policies.class);
        policies.encryption_required = true;
        policies.topicDispatchRate = Maps.newHashMap();
        policies.clusterDispatchRate = Maps.newHashMap();
        ((ZooKeeperDataCache) Mockito.doReturn(Optional.of(policies)).when(zooKeeperDataCache)).get(AdminResource.path(new String[]{"policies", TopicName.get("persistent://prop/use/ns-abc/successEncryptionRequiredTopic").getNamespace()}));
        ((ZooKeeperDataCache) Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(zooKeeperDataCache)).getAsync(AdminResource.path(new String[]{"policies", TopicName.get("persistent://prop/use/ns-abc/successEncryptionRequiredTopic").getNamespace()}));
        ((ConfigurationCacheService) Mockito.doReturn(zooKeeperDataCache).when(this.configCacheService)).policiesCache();
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successEncryptionRequiredTopic", 1L, 1L, "encrypted-producer", true, Collections.emptyMap())});
        Assert.assertEquals(getResponse().getClass(), CommandProducerSuccess.class);
        PersistentTopic persistentTopic = (PersistentTopic) this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successEncryptionRequiredTopic").get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testProducerFailureOnEncryptionRequiredTopic() throws Exception {
        resetChannel();
        setChannelConnected();
        Policies policies = (Policies) Mockito.mock(Policies.class);
        policies.encryption_required = true;
        policies.topicDispatchRate = Maps.newHashMap();
        policies.clusterDispatchRate = Maps.newHashMap();
        ((NamespaceResources) Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(this.namespaceResources)).getPoliciesAsync(TopicName.get("persistent://prop/use/ns-abc/successEncryptionRequiredTopic").getNamespaceObject());
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successEncryptionRequiredTopic", 2L, 2L, "unencrypted-producer", false, Collections.emptyMap())});
        Object response = getResponse();
        Assert.assertEquals(response.getClass(), CommandError.class);
        Assert.assertEquals(((CommandError) response).getError(), ServerError.MetadataError);
        PersistentTopic persistentTopic = (PersistentTopic) this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successEncryptionRequiredTopic").get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getProducers().size(), 0);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testProducerFailureOnEncryptionRequiredOnBroker() throws Exception {
        this.pulsar.getConfig().setEncryptionRequireOnProducer(true);
        resetChannel();
        setChannelConnected();
        ZooKeeperDataCache zooKeeperDataCache = (ZooKeeperDataCache) Mockito.mock(ZooKeeperDataCache.class);
        Policies policies = (Policies) Mockito.mock(Policies.class);
        policies.encryption_required = false;
        policies.topicDispatchRate = Maps.newHashMap();
        policies.clusterDispatchRate = Maps.newHashMap();
        ((ZooKeeperDataCache) Mockito.doReturn(Optional.of(policies)).when(zooKeeperDataCache)).get(AdminResource.path(new String[]{"policies", TopicName.get("persistent://prop/use/ns-abc/successEncryptionRequiredTopic").getNamespace()}));
        ((ZooKeeperDataCache) Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(zooKeeperDataCache)).getAsync(AdminResource.path(new String[]{"policies", TopicName.get("persistent://prop/use/ns-abc/successEncryptionRequiredTopic").getNamespace()}));
        ((ConfigurationCacheService) Mockito.doReturn(zooKeeperDataCache).when(this.configCacheService)).policiesCache();
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successEncryptionRequiredTopic", 2L, 2L, "unencrypted-producer", false, Collections.emptyMap())});
        Object response = getResponse();
        Assert.assertEquals(response.getClass(), CommandError.class);
        Assert.assertEquals(((CommandError) response).getError(), ServerError.MetadataError);
        PersistentTopic persistentTopic = (PersistentTopic) this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successEncryptionRequiredTopic").get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getProducers().size(), 0);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSendSuccessOnEncryptionRequiredTopic() throws Exception {
        resetChannel();
        setChannelConnected();
        ZooKeeperDataCache zooKeeperDataCache = (ZooKeeperDataCache) Mockito.mock(ZooKeeperDataCache.class);
        Policies policies = (Policies) Mockito.mock(Policies.class);
        policies.encryption_required = true;
        policies.topicDispatchRate = Maps.newHashMap();
        policies.clusterDispatchRate = Maps.newHashMap();
        ((ZooKeeperDataCache) Mockito.doReturn(Optional.of(policies)).when(zooKeeperDataCache)).get(AdminResource.path(new String[]{"policies", TopicName.get("persistent://prop/use/ns-abc/successEncryptionRequiredTopic").getNamespace()}));
        ((ZooKeeperDataCache) Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(zooKeeperDataCache)).getAsync(AdminResource.path(new String[]{"policies", TopicName.get("persistent://prop/use/ns-abc/successEncryptionRequiredTopic").getNamespace()}));
        ((ConfigurationCacheService) Mockito.doReturn(zooKeeperDataCache).when(this.configCacheService)).policiesCache();
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successEncryptionRequiredTopic", 1L, 1L, "prod-name", true, Collections.emptyMap())});
        Assert.assertTrue(getResponse() instanceof CommandProducerSuccess);
        MessageMetadata sequenceId = new MessageMetadata().setPublishTime(System.currentTimeMillis()).setProducerName("prod-name").setSequenceId(0L);
        sequenceId.addEncryptionKey().setKey("testKey").setValue("testVal".getBytes());
        ByteBuf coalesce = ByteBufPair.coalesce(Commands.newSend(1L, 0L, 1, Commands.ChecksumType.None, sequenceId, Unpooled.buffer(1024)));
        this.channel.writeInbound(new Object[]{Unpooled.copiedBuffer(coalesce)});
        coalesce.release();
        Assert.assertTrue(getResponse() instanceof CommandSendReceipt);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSendFailureOnEncryptionRequiredTopic() throws Exception {
        resetChannel();
        setChannelConnected();
        Policies policies = (Policies) Mockito.mock(Policies.class);
        policies.encryption_required = true;
        policies.topicDispatchRate = Maps.newHashMap();
        policies.clusterDispatchRate = Maps.newHashMap();
        ((NamespaceResources) Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(this.namespaceResources)).getPoliciesAsync(TopicName.get("persistent://prop/use/ns-abc/successEncryptionRequiredTopic").getNamespaceObject());
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successEncryptionRequiredTopic", 1L, 1L, "prod-name", true, Collections.emptyMap())});
        Assert.assertTrue(getResponse() instanceof CommandProducerSuccess);
        ByteBuf coalesce = ByteBufPair.coalesce(Commands.newSend(1L, 0L, 1, Commands.ChecksumType.None, new MessageMetadata().setPublishTime(System.currentTimeMillis()).setProducerName("prod-name").setSequenceId(0L), Unpooled.buffer(1024)));
        this.channel.writeInbound(new Object[]{Unpooled.copiedBuffer(coalesce)});
        coalesce.release();
        Assert.assertTrue(getResponse() instanceof CommandSendError);
        this.channel.finish();
    }

    protected void resetChannel() throws Exception {
        if (this.channel != null && this.channel.isActive()) {
            this.serverCnx.close();
            this.channel.close().get();
        }
        this.serverCnx = new ServerCnx(this.pulsar);
        this.serverCnx.authRole = "";
        this.channel = new EmbeddedChannel(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(5242880, 0, 4, 0, 4), this.serverCnx});
    }

    protected void setChannelConnected() throws Exception {
        Field declaredField = ServerCnx.class.getDeclaredField("state");
        declaredField.setAccessible(true);
        declaredField.set(this.serverCnx, ServerCnx.State.Connected);
    }

    private void setConnectionVersion(int i) throws Exception {
        ServerCnx serverCnx = this.serverCnx;
        Field declaredField = PulsarHandler.class.getDeclaredField("remoteEndpointProtocolVersion");
        declaredField.setAccessible(true);
        declaredField.set(serverCnx, Integer.valueOf(i));
    }

    protected Object getResponse() throws Exception {
        long millis = TimeUnit.SECONDS.toMillis(10L) / 10;
        for (int i = 0; i < millis; i++) {
            if (!this.channel.outboundMessages().isEmpty()) {
                return this.clientChannelHelper.getCommand(this.channel.outboundMessages().remove());
            }
            Thread.sleep(10L);
        }
        throw new IOException("Failed to get response from socket within 10s");
    }

    private void setupMLAsyncCallbackMocks() {
        ((ManagedLedger) Mockito.doReturn(new ArrayList()).when(this.ledgerMock)).getCursors();
        ((ManagedLedgerFactory) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.ServerCnxTest.2
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                Thread.sleep(300L);
                ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ServerCnxTest.this.ledgerMock, (Object) null);
                return null;
            }
        }).when(this.mlFactoryMock)).asyncOpen(Mockito.matches(".*success.*"), (ManagedLedgerConfig) ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier) ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        ((ManagedLedgerFactory) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.ServerCnxTest.3
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                Thread.sleep(300L);
                new Thread(() -> {
                    ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), (Object) null);
                }).start();
                return null;
            }
        }).when(this.mlFactoryMock)).asyncOpen(Mockito.matches(".*fail.*"), (ManagedLedgerConfig) ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier) ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.ServerCnxTest.4
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(-1L, -1L), (ByteBuf) null, invocationOnMock.getArguments()[2]);
                return null;
            }
        }).when(this.ledgerMock)).asyncAddEntry((ByteBuf) ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback) ArgumentMatchers.any(AsyncCallbacks.AddEntryCallback.class), ArgumentMatchers.any());
        ((ManagedCursor) Mockito.doAnswer(invocationOnMock -> {
            return true;
        }).when(this.cursorMock)).isDurable();
        ((ManagedLedger) Mockito.doAnswer(invocationOnMock2 -> {
            Thread.sleep(300L);
            ((AsyncCallbacks.OpenCursorCallback) invocationOnMock2.getArguments()[2]).openCursorComplete(this.cursorMock, (Object) null);
            return null;
        }).when(this.ledgerMock)).asyncOpenCursor(Mockito.matches(".*success.*"), (CommandSubscribe.InitialPosition) ArgumentMatchers.any(CommandSubscribe.InitialPosition.class), (AsyncCallbacks.OpenCursorCallback) ArgumentMatchers.any(AsyncCallbacks.OpenCursorCallback.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(invocationOnMock3 -> {
            Thread.sleep(300L);
            ((AsyncCallbacks.OpenCursorCallback) invocationOnMock3.getArguments()[3]).openCursorComplete(this.cursorMock, (Object) null);
            return null;
        }).when(this.ledgerMock)).asyncOpenCursor(Mockito.matches(".*success.*"), (CommandSubscribe.InitialPosition) ArgumentMatchers.any(CommandSubscribe.InitialPosition.class), (Map) ArgumentMatchers.any(Map.class), (AsyncCallbacks.OpenCursorCallback) ArgumentMatchers.any(AsyncCallbacks.OpenCursorCallback.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(invocationOnMock4 -> {
            Thread.sleep(300L);
            ((AsyncCallbacks.OpenCursorCallback) invocationOnMock4.getArguments()[2]).openCursorFailed(new ManagedLedgerException("Managed ledger failure"), (Object) null);
            return null;
        }).when(this.ledgerMock)).asyncOpenCursor(Mockito.matches(".*fail.*"), (CommandSubscribe.InitialPosition) ArgumentMatchers.any(CommandSubscribe.InitialPosition.class), (AsyncCallbacks.OpenCursorCallback) ArgumentMatchers.any(AsyncCallbacks.OpenCursorCallback.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(invocationOnMock5 -> {
            Thread.sleep(300L);
            ((AsyncCallbacks.OpenCursorCallback) invocationOnMock5.getArguments()[3]).openCursorFailed(new ManagedLedgerException("Managed ledger failure"), (Object) null);
            return null;
        }).when(this.ledgerMock)).asyncOpenCursor(Mockito.matches(".*fail.*"), (CommandSubscribe.InitialPosition) ArgumentMatchers.any(CommandSubscribe.InitialPosition.class), (Map) ArgumentMatchers.any(Map.class), (AsyncCallbacks.OpenCursorCallback) ArgumentMatchers.any(AsyncCallbacks.OpenCursorCallback.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(invocationOnMock6 -> {
            ((AsyncCallbacks.DeleteCursorCallback) invocationOnMock6.getArguments()[1]).deleteCursorComplete((Object) null);
            return null;
        }).when(this.ledgerMock)).asyncDeleteCursor(Mockito.matches(".*success.*"), (AsyncCallbacks.DeleteCursorCallback) ArgumentMatchers.any(AsyncCallbacks.DeleteCursorCallback.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(invocationOnMock7 -> {
            ((AsyncCallbacks.DeleteCursorCallback) invocationOnMock7.getArguments()[1]).deleteCursorFailed(new ManagedLedgerException("Managed ledger failure"), (Object) null);
            return null;
        }).when(this.ledgerMock)).asyncDeleteCursor(Mockito.matches(".*fail.*"), (AsyncCallbacks.DeleteCursorCallback) ArgumentMatchers.any(AsyncCallbacks.DeleteCursorCallback.class), ArgumentMatchers.any());
        ((ManagedCursor) Mockito.doAnswer(invocationOnMock8 -> {
            ((AsyncCallbacks.CloseCallback) invocationOnMock8.getArguments()[0]).closeComplete((Object) null);
            return null;
        }).when(this.cursorMock)).asyncClose((AsyncCallbacks.CloseCallback) ArgumentMatchers.any(AsyncCallbacks.CloseCallback.class), ArgumentMatchers.any());
        ((ManagedCursor) Mockito.doReturn("successSub").when(this.cursorMock)).getName();
    }

    @Test(timeOut = 30000)
    public void testInvalidTopicOnLookup() throws Exception {
        resetChannel();
        setChannelConnected();
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newLookup("xx/ass/aa/aaa", true, 1L)});
        Object response = getResponse();
        Assert.assertEquals(response.getClass(), CommandLookupTopicResponse.class);
        Assert.assertEquals(((CommandLookupTopicResponse) response).getError(), ServerError.InvalidTopicName);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testInvalidTopicOnProducer() throws Exception {
        resetChannel();
        setChannelConnected();
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newProducer("xx/ass/aa/aaa", 1L, 1L, "prod-name", Collections.emptyMap())});
        Object response = getResponse();
        Assert.assertEquals(response.getClass(), CommandError.class);
        Assert.assertEquals(((CommandError) response).getError(), ServerError.InvalidTopicName);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testInvalidTopicOnSubscribe() throws Exception {
        resetChannel();
        setChannelConnected();
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("xx/ass/aa/aaa", "test-subscription", 1L, 1L, CommandSubscribe.SubType.Exclusive, 0, "consumerName", 0L)});
        Object response = getResponse();
        Assert.assertEquals(response.getClass(), CommandError.class);
        Assert.assertEquals(((CommandError) response).getError(), ServerError.InvalidTopicName);
        this.channel.finish();
    }

    @Test
    public void testDelayedClosedProducer() throws Exception {
        resetChannel();
        setChannelConnected();
        ((BrokerService) Mockito.doReturn(new CompletableFuture()).when(this.brokerService)).getOrCreateTopic((String) ArgumentMatchers.any(String.class));
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1, 1L, "prod-name", Collections.emptyMap())});
        this.channel.writeInbound(new Object[]{Commands.newCloseProducer(1, 2L)});
        Topic topic = (Topic) Mockito.mock(Topic.class);
        ((BrokerService) Mockito.doReturn(CompletableFuture.completedFuture(topic)).when(this.brokerService)).getOrCreateTopic((String) ArgumentMatchers.any(String.class));
        ((Topic) Mockito.doReturn(CompletableFuture.completedFuture(false)).when(topic)).hasSchema();
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1, 1L, "prod-name", Collections.emptyMap())});
        Assert.assertTrue(getResponse() instanceof CommandSuccess);
        this.channel.finish();
    }

    @Test(timeOut = 30000)
    public void testTopicIsNotReady() throws Exception {
        resetChannel();
        setChannelConnected();
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "successSub", 1L, 1L, CommandSubscribe.SubType.Shared, 0, "c1", 0L)});
        Object response = getResponse();
        Assert.assertEquals(response.getClass(), CommandSuccess.class);
        Assert.assertEquals(((CommandSuccess) response).getRequestId(), 1L);
        ((BrokerService) Mockito.doReturn(FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException("Service unit is not ready"))).when(this.brokerService)).checkTopicNsOwnership(ArgumentMatchers.anyString());
        this.channel.writeInbound(new Object[]{Commands.newSubscribe("persistent://prop/use/ns-abc/successTopic", "successSub", 2L, 2L, CommandSubscribe.SubType.Shared, 0, "c2", 0L)});
        Object response2 = getResponse();
        Assert.assertEquals(response2.getClass(), CommandError.class);
        Assert.assertEquals(((CommandError) response2).getError(), ServerError.ServiceNotReady);
        Assert.assertEquals(((CommandError) response2).getRequestId(), 2L);
        this.channel.writeInbound(new Object[]{Commands.newProducer("persistent://prop/use/ns-abc/successTopic", 1L, 3L, "p1", Collections.emptyMap())});
        Object response3 = getResponse();
        Assert.assertEquals(response3.getClass(), CommandError.class);
        Assert.assertEquals(((CommandError) response3).getError(), ServerError.ServiceNotReady);
        Assert.assertEquals(((CommandError) response3).getRequestId(), 3L);
        this.channel.finish();
    }
}
