/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.resources.BaseResources;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.SchemaDefinitionBuilderImpl;
import org.apache.pulsar.client.impl.schema.reader.JacksonJsonReader;
import org.apache.pulsar.client.impl.schema.writer.JacksonJsonWriter;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker-impl"})
public class BrokerClientIntegrationTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(BrokerClientIntegrationTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider
    public Object[][] subType() {
        return new Object[][]{{SubscriptionType.Shared}, {SubscriptionType.Failover}};
    }

    @DataProvider(name="booleanFlagProvider")
    public Object[][] booleanFlagProvider() {
        return new Object[][]{{true}, {false}};
    }

    @Test
    public void testDisconnectClientWithoutClosingConnection() throws Exception {
        String ns1 = "my-property/con-ns1";
        String ns2 = "my-property/con-ns2";
        this.admin.namespaces().createNamespace("my-property/con-ns1", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().createNamespace("my-property/con-ns2", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        String topic1 = "persistent://my-property/con-ns1/my-topic";
        String topic2 = "persistent://my-property/con-ns2/my-topic";
        ConsumerImpl cons1 = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/con-ns1/my-topic"}).subscriptionName("my-subscriber-name").subscribe();
        ProducerImpl prod1 = (ProducerImpl)this.pulsarClient.newProducer().topic("persistent://my-property/con-ns1/my-topic").create();
        ProducerImpl prod2 = (ProducerImpl)this.pulsarClient.newProducer().topic("persistent://my-property/con-ns2/my-topic").create();
        ConsumerImpl consumer1 = (ConsumerImpl)Mockito.spy((Object)cons1);
        ((ConsumerImpl)Mockito.doAnswer(invocationOnMock -> cons1.getState()).when((Object)consumer1)).getState();
        ((ConsumerImpl)Mockito.doAnswer(invocationOnMock -> cons1.getClientCnx()).when((Object)consumer1)).getClientCnx();
        ((ConsumerImpl)Mockito.doAnswer(invocationOnMock -> cons1.cnx()).when((Object)consumer1)).cnx();
        ((ConsumerImpl)Mockito.doAnswer(invocationOnMock -> {
            cons1.connectionClosed((ClientCnx)invocationOnMock.getArguments()[0]);
            return null;
        }).when((Object)consumer1)).connectionClosed((ClientCnx)Mockito.any());
        ProducerImpl producer1 = (ProducerImpl)Mockito.spy((Object)prod1);
        ((ProducerImpl)Mockito.doAnswer(invocationOnMock -> prod1.getState()).when((Object)producer1)).getState();
        ((ProducerImpl)Mockito.doAnswer(invocationOnMock -> prod1.getClientCnx()).when((Object)producer1)).getClientCnx();
        ((ProducerImpl)Mockito.doAnswer(invocationOnMock -> prod1.cnx()).when((Object)producer1)).cnx();
        ((ProducerImpl)Mockito.doAnswer(invocationOnMock -> {
            prod1.connectionClosed((ClientCnx)invocationOnMock.getArguments()[0]);
            return null;
        }).when((Object)producer1)).connectionClosed((ClientCnx)Mockito.any());
        ProducerImpl producer2 = (ProducerImpl)Mockito.spy((Object)prod2);
        ((ProducerImpl)Mockito.doAnswer(invocationOnMock -> prod2.getState()).when((Object)producer2)).getState();
        ((ProducerImpl)Mockito.doAnswer(invocationOnMock -> prod2.getClientCnx()).when((Object)producer2)).getClientCnx();
        ((ProducerImpl)Mockito.doAnswer(invocationOnMock -> prod2.cnx()).when((Object)producer2)).cnx();
        ((ProducerImpl)Mockito.doAnswer(invocationOnMock -> {
            prod2.connectionClosed((ClientCnx)invocationOnMock.getArguments()[0]);
            return null;
        }).when((Object)producer2)).connectionClosed((ClientCnx)Mockito.any());
        ClientCnx clientCnx = producer1.getClientCnx();
        Field pfield = ClientCnx.class.getDeclaredField("producers");
        pfield.setAccessible(true);
        Field cfield = ClientCnx.class.getDeclaredField("consumers");
        cfield.setAccessible(true);
        ConcurrentLongHashMap producers = (ConcurrentLongHashMap)pfield.get(clientCnx);
        ConcurrentLongHashMap consumers = (ConcurrentLongHashMap)cfield.get(clientCnx);
        producers.put(2L, producers.get(0L));
        producers.put(3L, producers.get(1L));
        consumers.put(1L, consumers.get(0L));
        producers.put(0L, (Object)producer1);
        producers.put(1L, (Object)producer2);
        consumers.put(0L, (Object)consumer1);
        ((LoadManager)this.pulsar.getLoadManager().get()).disableBroker();
        NamespaceBundle bundle1 = this.pulsar.getNamespaceService().getBundle(TopicName.get((String)"persistent://my-property/con-ns1/my-topic"));
        NamespaceBundle bundle2 = this.pulsar.getNamespaceService().getBundle(TopicName.get((String)"persistent://my-property/con-ns2/my-topic"));
        this.pulsar.getNamespaceService().unloadNamespaceBundle(bundle1).join();
        Thread.sleep(1000L);
        ((ProducerImpl)Mockito.verify((Object)producer1, (VerificationMode)Mockito.atLeastOnce())).connectionClosed((ClientCnx)Mockito.any());
        ((ConsumerImpl)Mockito.verify((Object)consumer1, (VerificationMode)Mockito.atLeastOnce())).connectionClosed((ClientCnx)Mockito.any());
        ((ProducerImpl)Mockito.verify((Object)producer2, (VerificationMode)Mockito.never())).connectionClosed((ClientCnx)Mockito.any());
        Thread.sleep(200L);
        Assert.assertNull((Object)prod1.getClientCnx());
        Assert.assertEquals((Object)HandlerState.State.Connecting, (Object)prod1.getState());
        Assert.assertNull((Object)cons1.getClientCnx());
        Assert.assertEquals((Object)HandlerState.State.Connecting, (Object)cons1.getState());
        Assert.assertNotNull((Object)prod2.getClientCnx());
        Assert.assertEquals((Object)HandlerState.State.Ready, (Object)prod2.getState());
        this.pulsar.getNamespaceService().unloadNamespaceBundle(bundle2).join();
        Thread.sleep(200L);
        ((ProducerImpl)Mockito.verify((Object)producer2, (VerificationMode)Mockito.atLeastOnce())).connectionClosed((ClientCnx)Mockito.any());
        Assert.assertNull((Object)prod1.getClientCnx());
        Assert.assertEquals((Object)HandlerState.State.Connecting, (Object)prod1.getState());
        Assert.assertNull((Object)cons1.getClientCnx());
        Assert.assertEquals((Object)HandlerState.State.Connecting, (Object)cons1.getState());
        Assert.assertNull((Object)prod2.getClientCnx());
        Assert.assertEquals((Object)HandlerState.State.Connecting, (Object)prod2.getState());
        producer1.close();
        producer2.close();
        consumer1.close();
        prod1.close();
        prod2.close();
        cons1.close();
    }

    @Test
    public void testCloseBrokerService() throws Exception {
        String ns1 = "my-property/brok-ns1";
        String ns2 = "my-property/brok-ns2";
        this.admin.namespaces().createNamespace("my-property/brok-ns1", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().createNamespace("my-property/brok-ns2", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        String topic1 = "persistent://my-property/brok-ns1/my-topic";
        String topic2 = "persistent://my-property/brok-ns2/my-topic";
        ConsumerImpl consumer1 = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/brok-ns1/my-topic"}).subscriptionName("my-subscriber-name").subscribe();
        ProducerImpl producer1 = (ProducerImpl)this.pulsarClient.newProducer().topic("persistent://my-property/brok-ns1/my-topic").create();
        ProducerImpl producer2 = (ProducerImpl)this.pulsarClient.newProducer().topic("persistent://my-property/brok-ns2/my-topic").create();
        this.pulsar.getBrokerService().close();
        OwnershipCache ownershipCache = this.pulsar.getNamespaceService().getOwnershipCache();
        Assert.assertTrue((boolean)ownershipCache.getOwnedBundles().keySet().isEmpty());
        BrokerClientIntegrationTest.retryStrategically(test -> producer1.getClientCnx() == null && consumer1.getClientCnx() == null && producer2.getClientCnx() == null, 5, 100L);
        Assert.assertNull((Object)producer1.getClientCnx());
        Assert.assertEquals((Object)HandlerState.State.Connecting, (Object)producer1.getState());
        Assert.assertNull((Object)consumer1.getClientCnx());
        Assert.assertEquals((Object)HandlerState.State.Connecting, (Object)consumer1.getState());
        Assert.assertNull((Object)producer2.getClientCnx());
        Assert.assertEquals((Object)HandlerState.State.Connecting, (Object)producer2.getState());
        producer1.close();
        producer2.close();
        consumer1.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="subType")
    public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = "persistent://my-property/my-ns/my-topic1";
        String subscriptionName = "my-subscriber-name" + subType;
        ConsumerImpl consumer1 = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName(subscriptionName).subscriptionType(subType).subscribe();
        int numMessagesPerBatch = 10;
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1").enableBatching(false).create();
        Producer batchProducer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1").enableBatching(true).batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.SECONDS).batchingMaxMessages(10).create();
        Topic topic = (Topic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/my-ns/my-topic1").get();
        org.apache.pulsar.broker.service.Consumer brokerConsumer = (org.apache.pulsar.broker.service.Consumer)((Subscription)topic.getSubscriptions().get((Object)subscriptionName)).getConsumers().get(0);
        Field cnxField = org.apache.pulsar.broker.service.Consumer.class.getDeclaredField("cnx");
        cnxField.setAccessible(true);
        PulsarHandler cnx = (PulsarHandler)cnxField.get(brokerConsumer);
        Field versionField = PulsarHandler.class.getDeclaredField("remoteEndpointProtocolVersion");
        versionField.setAccessible(true);
        versionField.set(cnx, 3);
        MessageId lastNonBatchedMessageId = null;
        for (int i2 = 0; i2 < 10; ++i2) {
            String message = "my-message-" + i2;
            lastNonBatchedMessageId = producer.send((Object)message.getBytes());
        }
        HashSet messageSet = Sets.newHashSet();
        Message msg = null;
        for (i = 0; i < 10; ++i) {
            msg = consumer1.receive(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
            consumer1.acknowledge(msg);
        }
        consumer1.setClientCnx(null);
        for (i = 0; i < 10; ++i) {
            String message = "my-batch-message-" + i;
            batchProducer.sendAsync((Object)message.getBytes());
        }
        batchProducer.flush();
        msg = consumer1.receive(100, TimeUnit.MILLISECONDS);
        Assert.assertNull((Object)msg);
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer consumer2 = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName(subscriptionName).subscriptionType(subType).subscribe();
            consumer2.seek(lastNonBatchedMessageId);
            messageSet.clear();
            for (int i3 = 0; i3 < 10; ++i3) {
                msg = consumer2.receive();
                String receivedMessage = new String(msg.getData());
                log.debug("Received message: [{}]", (Object)receivedMessage);
                String expectedMessage = "my-batch-message-" + i3;
                this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
                consumer2.acknowledge(msg);
            }
            consumer2.close();
            producer.close();
            batchProducer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        finally {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        }
    }

    @Test(dataProvider="subType")
    public void testResetCursor(SubscriptionType subType) throws Exception {
        int n;
        RetentionPolicies policy = new RetentionPolicies(60, 53248);
        TopicName topicName = TopicName.get((String)"persistent://my-property/my-ns/unacked-topic");
        int warmup = 20;
        int testSize = 150;
        ArrayList received = new ArrayList();
        String subsId = "sub";
        ConcurrentSkipListMap publishTimeIdMap = new ConcurrentSkipListMap();
        this.conf.setActiveConsumerFailoverDelayTimeMillis(500);
        this.restartBroker();
        this.admin.namespaces().setRetention(topicName.getNamespace(), policy);
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).startMessageIdInclusive().subscriptionName("sub").subscriptionType(subType).messageListener((MessageListener & Serializable)(consumer, msg) -> {
            try {
                List list = received;
                synchronized (list) {
                    received.add(msg);
                }
                consumer.acknowledge(msg);
                long publishTime = msg.getPublishTime();
                log.info(" publish time is " + publishTime + "," + msg.getMessageId());
                TimestampEntryCount timestampEntryCount = publishTimeIdMap.computeIfAbsent(publishTime, k -> new TimestampEntryCount(publishTime));
                timestampEntryCount.incrementAndGet();
            }
            catch (PulsarClientException e) {
                log.warn("Failed to ack!");
            }
        });
        Consumer consumer1 = consumerBuilder.subscribe();
        Consumer consumer2 = consumerBuilder.subscribe();
        Producer producer = this.pulsarClient.newProducer().topic(topicName.toString()).create();
        log.info("warm up started for " + topicName.toString());
        byte[] msgBytes = new byte[1000];
        for (int i = 0; i < 20; ++i) {
            producer.send((Object)msgBytes);
        }
        log.info("warm up finished.");
        for (n = 0; n < 10 && received.size() < 20; ++n) {
            Thread.sleep(200L);
        }
        Assert.assertEquals((int)received.size(), (int)20);
        received.clear();
        log.info("Sending more messages.");
        for (n = 0; n < 150; ++n) {
            producer.send((Object)msgBytes);
            Thread.sleep(1L);
        }
        log.info("Sending more messages done.");
        Thread.sleep(3000L);
        long begints = (Long)publishTimeIdMap.firstEntry().getKey();
        long endts = (Long)publishTimeIdMap.lastEntry().getKey();
        long timestamp = (endts - begints) / 2L + begints;
        timestamp = publishTimeIdMap.floorKey(timestamp);
        ConcurrentSkipListMap expectedMessages = new ConcurrentSkipListMap();
        expectedMessages.putAll(publishTimeIdMap.tailMap(timestamp, true));
        received.clear();
        log.info("reset cursor to " + timestamp + " for topic " + topicName.toString() + " for subs " + "sub");
        log.info("issuing admin operation on " + this.admin.getServiceUrl());
        List subList = this.admin.topics().getSubscriptions(topicName.toString());
        for (Object subs : subList) {
            log.info("got sub " + (String)subs);
        }
        publishTimeIdMap.clear();
        Assert.assertTrue((boolean)subList.contains("sub"));
        this.admin.topics().resetCursor(topicName.toString(), "sub", timestamp);
        Thread.sleep(3000L);
        int totalExpected = 0;
        for (TimestampEntryCount tec : expectedMessages.values()) {
            totalExpected += tec.numMessages;
        }
        Assert.assertTrue(((Long)publishTimeIdMap.firstEntry().getKey() >= timestamp ? 1 : 0) != 0);
        consumer1.close();
        consumer2.close();
        producer.close();
        int totalReceived = 0;
        for (TimestampEntryCount tec : publishTimeIdMap.values()) {
            totalReceived += tec.numMessages;
        }
        Assert.assertEquals((int)totalReceived, (int)totalExpected, (String)"did not receive all messages on replay after reset");
        this.resetConfig();
        this.restartBroker();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCloseConnectionOnBrokerRejectedRequest() throws Exception {
        String topicName = "persistent://prop/usw/my-ns/newTopic";
        int maxConccurentLookupRequest = this.pulsar.getConfiguration().getMaxConcurrentLookupRequest();
        int concurrentLookupRequests = 20;
        ExecutorService executor = Executors.newFixedThreadPool(20);
        try {
            try {
                this.stopBroker();
                this.conf.setMaxConcurrentLookupRequest(1);
                this.startBroker();
                String lookupUrl = this.pulsar.getBrokerServiceUrl();
                PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0L, TimeUnit.SECONDS).maxNumberOfRejectedRequestPerConnection(0).build();
                try {
                    PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0L, TimeUnit.SECONDS).ioThreads(20).connectionsPerBroker(20).build();
                    try {
                        ProducerImpl producer = (ProducerImpl)pulsarClient.newProducer().topic("persistent://prop/usw/my-ns/newTopic").create();
                        ClientCnx cnx = producer.cnx();
                        Assert.assertTrue((boolean)cnx.channel().isActive());
                        int totalProducer = 100;
                        CountDownLatch latch = new CountDownLatch(200);
                        AtomicInteger failed = new AtomicInteger(0);
                        for (int i = 0; i < 100; ++i) {
                            executor.submit(() -> {
                                pulsarClient2.newProducer().topic("persistent://prop/usw/my-ns/newTopic").createAsync().handle((ok, e) -> {
                                    if (e != null) {
                                        failed.set(1);
                                    }
                                    latch.countDown();
                                    return null;
                                });
                                pulsarClient.newProducer().topic("persistent://prop/usw/my-ns/newTopic").createAsync().handle((ok, e) -> {
                                    if (e != null) {
                                        failed.set(1);
                                    }
                                    latch.countDown();
                                    return null;
                                });
                            });
                        }
                        latch.await(10L, TimeUnit.SECONDS);
                        Assert.assertEquals((int)failed.get(), (int)1);
                    }
                    finally {
                        if (Collections.singletonList(pulsarClient2).get(0) != null) {
                            pulsarClient2.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(pulsarClient).get(0) != null) {
                        pulsarClient.close();
                    }
                }
            }
            finally {
                this.conf.setMaxConcurrentLookupRequest(maxConccurentLookupRequest);
            }
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMaxConcurrentTopicLoading() throws Exception {
        String topicName = "persistent://prop/usw/my-ns/cocurrentLoadingTopic";
        int concurrentTopic = this.pulsar.getConfiguration().getMaxConcurrentTopicLoadRequest();
        int concurrentLookupRequests = 20;
        ExecutorService executor = Executors.newFixedThreadPool(20);
        try {
            try {
                this.pulsar.getConfiguration().setAuthorizationEnabled(false);
                this.stopBroker();
                this.startBroker();
                this.pulsar.getConfiguration().setMaxConcurrentTopicLoadRequest(1);
                String lookupUrl = this.pulsar.getBrokerServiceUrl();
                try (PulsarClientImpl pulsarClient = (PulsarClientImpl)PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0L, TimeUnit.SECONDS).maxNumberOfRejectedRequestPerConnection(0).build();
                     PulsarClientImpl pulsarClient2 = (PulsarClientImpl)PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0L, TimeUnit.SECONDS).ioThreads(20).connectionsPerBroker(20).build();){
                    ProducerImpl producer = (ProducerImpl)pulsarClient.newProducer().topic("persistent://prop/usw/my-ns/cocurrentLoadingTopic").create();
                    ClientCnx cnx = producer.cnx();
                    Assert.assertTrue((boolean)cnx.channel().isActive());
                    ArrayList futures = Lists.newArrayList();
                    int totalProducers = 10;
                    CountDownLatch latch = new CountDownLatch(10);
                    for (int i = 0; i < 10; ++i) {
                        executor.submit(() -> {
                            String randomTopicName1 = "persistent://prop/usw/my-ns/cocurrentLoadingTopic" + UUID.randomUUID().toString();
                            String randomTopicName2 = "persistent://prop/usw/my-ns/cocurrentLoadingTopic" + UUID.randomUUID().toString();
                            List list = futures;
                            synchronized (list) {
                                futures.add(pulsarClient2.newProducer().topic(randomTopicName1).createAsync());
                                futures.add(pulsarClient.newProducer().topic(randomTopicName2).createAsync());
                            }
                            latch.countDown();
                        });
                    }
                    latch.await();
                    ArrayList arrayList = futures;
                    synchronized (arrayList) {
                        FutureUtil.waitForAll((List)futures).get();
                    }
                }
            }
            finally {
                this.pulsar.getConfiguration().setMaxConcurrentTopicLoadRequest(concurrentTopic);
            }
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCloseConnectionOnInternalServerError() throws Exception {
        String topicName = "persistent://prop/usw/my-ns/newTopic";
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
        try {
            ProducerImpl producer = (ProducerImpl)pulsarClient.newProducer().topic("persistent://prop/usw/my-ns/newTopic").create();
            ClientCnx cnx = producer.cnx();
            Assert.assertTrue((boolean)cnx.channel().isActive());
            Field cacheField = BaseResources.class.getDeclaredField("cache");
            cacheField.setAccessible(true);
            cacheField.set(this.pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources(), null);
            try {
                pulsarClient.newProducer().topic("persistent://prop/usw/my-ns/newTopic").create();
                Assert.fail((String)"it should have fail with lookup-exception:");
            }
            catch (Exception exception) {
                // empty catch block
            }
            Assert.assertFalse((boolean)cnx.channel().isActive());
        }
        finally {
            if (Collections.singletonList(pulsarClient).get(0) != null) {
                pulsarClient.close();
            }
        }
    }

    @Test
    public void testInvalidDynamicConfiguration() throws Exception {
        try {
            this.admin.brokers().updateDynamicConfiguration("loadManagerClassName", "org.apache.pulsar.invalid.loadmanager");
            Assert.fail((String)"it should have failed due to invalid argument");
        }
        catch (PulsarAdminException pulsarAdminException) {
            // empty catch block
        }
        try {
            this.admin.brokers().updateDynamicConfiguration("loadManagerClassName", "org.apache.pulsar.broker.loadbalance.ModularLoadManager");
        }
        catch (PulsarAdminException e) {
            Assert.fail((String)"it should have failed due to invalid argument", (Throwable)e);
        }
        ZooKeeperDataCache dynamicConfigurationCache = this.pulsar.getBrokerService().getDynamicConfigurationCache();
        Map configurationMap = (Map)dynamicConfigurationCache.get("/admin/configuration").get();
        configurationMap.put("loadManagerClassName", "org.apache.pulsar.invalid.loadmanager");
        byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)configurationMap);
        dynamicConfigurationCache.invalidate("/admin/configuration");
        this.mockZooKeeper.setData("/admin/configuration", content, -1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCleanProducer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        this.admin.clusters().createCluster("global", new ClusterData());
        this.admin.namespaces().createNamespace("my-property/global/lookup");
        int operationTimeOut = 500;
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(500, TimeUnit.MILLISECONDS).build();
        try {
            CountDownLatch latch = new CountDownLatch(1);
            pulsarClient.newProducer().topic("persistent://my-property/global/lookup/my-topic1").createAsync().handle((producer, e) -> {
                latch.countDown();
                return null;
            });
            latch.await(1500L, TimeUnit.MILLISECONDS);
            Field prodField = PulsarClientImpl.class.getDeclaredField("producers");
            prodField.setAccessible(true);
            Set producers = (Set)prodField.get(pulsarClient);
            Assert.assertTrue((boolean)producers.isEmpty());
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        finally {
            if (Collections.singletonList(pulsarClient).get(0) != null) {
                pulsarClient.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expectedExceptions={PulsarClientException.TimeoutException.class})
    public void testOperationTimeout() throws PulsarClientException {
        String topicName = "persistent://my-property/my-ns/my-topic1";
        ConcurrentOpenHashMap topics = this.pulsar.getBrokerService().getTopics();
        topics.put((Object)"persistent://my-property/my-ns/my-topic1", new CompletableFuture());
        try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).operationTimeout(2, TimeUnit.SECONDS).statsInterval(0L, TimeUnit.SECONDS).build();){
            Producer producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1").create();
        }
        finally {
            topics.clear();
        }
    }

    @Test
    public void testAddEntryOperationTimeout() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        this.conf.setManagedLedgerAddEntryTimeoutSeconds(1L);
        String topicName = "persistent://my-property/my-ns/addEntryTimeoutTopic";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/addEntryTimeoutTopic").create();
        ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/addEntryTimeoutTopic"}).subscriptionName("my-subscriber-name").subscribe();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/my-ns/addEntryTimeoutTopic").get();
        ManagedLedgerImpl ml = (ManagedLedgerImpl)topic.getManagedLedger();
        class MockLedgerHandle
        extends PulsarMockLedgerHandle {
            public MockLedgerHandle(PulsarMockBookKeeper bk, long id, BookKeeper.DigestType digest, byte[] passwd) throws GeneralSecurityException {
                super(bk, id, digest, passwd);
            }

            public void asyncAddEntry(byte[] data, AsyncCallback.AddCallback cb, Object ctx) {
            }

            public void asyncClose(AsyncCallback.CloseCallback cb, Object ctx) {
                cb.closeComplete(0, (LedgerHandle)this, ctx);
            }
        }
        MockLedgerHandle ledgerHandle = (MockLedgerHandle)((Object)Mockito.mock(MockLedgerHandle.class));
        byte[] data = "data".getBytes();
        ((MockLedgerHandle)((Object)Mockito.doNothing().when((Object)ledgerHandle))).asyncAddEntry(data, null, null);
        MockedPulsarServiceBaseTest.setFieldValue(ManagedLedgerImpl.class, ml, "currentLedger", (Object)ledgerHandle);
        CountDownLatch latch = new CountDownLatch(1);
        AtomicBoolean addedSuccessfully = new AtomicBoolean(false);
        producer.sendAsync((Object)data).handle((res, ex) -> {
            if (ex == null) {
                addedSuccessfully.set(true);
            } else {
                log.error("add-entry failed for {}", (Object)this.methodName, ex);
            }
            latch.countDown();
            return null;
        });
        latch.await();
        Assert.assertTrue((boolean)addedSuccessfully.get());
        byte[] receivedData = consumer.receive().getData();
        Assert.assertEquals((byte[])receivedData, (byte[])data);
        producer.close();
        consumer.close();
    }

    @Test
    public void testAvroSchemaProducerConsumerWithSpecifiedReaderAndWriter() throws PulsarClientException {
        String topicName = "persistent://my-property/my-ns/my-topic1";
        TestMessageObject object = new TestMessageObject();
        SchemaReader reader = (SchemaReader)Mockito.mock(SchemaReader.class);
        SchemaWriter writer = (SchemaWriter)Mockito.mock(SchemaWriter.class);
        Mockito.when((Object)reader.read((byte[])Mockito.any(byte[].class), (byte[])Mockito.any(byte[].class))).thenReturn((Object)object);
        Mockito.when((Object)writer.write(Mockito.any(TestMessageObject.class))).thenReturn((Object)"fake data".getBytes(StandardCharsets.UTF_8));
        SchemaDefinition schemaDefinition = new SchemaDefinitionBuilderImpl().withPojo(TestMessageObject.class).withSchemaReader(reader).withSchemaWriter(writer).build();
        Schema schema = Schema.AVRO((SchemaDefinition)schemaDefinition);
        try (PulsarClient client = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).build();
             Producer producer = client.newProducer(schema).topic("persistent://my-property/my-ns/my-topic1").create();
             Consumer consumer = client.newConsumer(schema).topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();){
            Assert.assertNotNull((Object)producer);
            Assert.assertNotNull((Object)consumer);
            producer.newMessage().value((Object)object).send();
            TestMessageObject testObject = (TestMessageObject)consumer.receive().getValue();
            Assert.assertEquals((String)object.getValue(), (String)testObject.getValue());
            ((SchemaWriter)Mockito.verify((Object)writer, (VerificationMode)Mockito.times((int)1))).write(Mockito.any());
            ((SchemaReader)Mockito.verify((Object)reader, (VerificationMode)Mockito.times((int)1))).read((byte[])Mockito.any(byte[].class), (byte[])Mockito.any(byte[].class));
        }
    }

    @Test
    public void testJsonSchemaProducerConsumerWithSpecifiedReaderAndWriter() throws PulsarClientException {
        String topicName = "persistent://my-property/my-ns/my-topic1";
        ObjectMapper mapper = new ObjectMapper();
        SchemaReader reader = (SchemaReader)Mockito.spy((Object)new JacksonJsonReader(mapper, TestMessageObject.class));
        SchemaWriter writer = (SchemaWriter)Mockito.spy((Object)new JacksonJsonWriter(mapper));
        SchemaDefinition schemaDefinition = new SchemaDefinitionBuilderImpl().withPojo(TestMessageObject.class).withSchemaReader(reader).withSchemaWriter(writer).build();
        Schema schema = Schema.JSON((SchemaDefinition)schemaDefinition);
        try (PulsarClient client = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).build();
             Producer producer = client.newProducer(schema).topic("persistent://my-property/my-ns/my-topic1").create();
             Consumer consumer = client.newConsumer(schema).topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();){
            Assert.assertNotNull((Object)producer);
            Assert.assertNotNull((Object)consumer);
            TestMessageObject object = new TestMessageObject();
            object.setValue("fooooo");
            producer.newMessage().value((Object)object).send();
            TestMessageObject testObject = (TestMessageObject)consumer.receive().getValue();
            Assert.assertEquals((String)object.getValue(), (String)testObject.getValue());
            ((SchemaWriter)Mockito.verify((Object)writer, (VerificationMode)Mockito.times((int)1))).write(Mockito.any());
            ((SchemaReader)Mockito.verify((Object)reader, (VerificationMode)Mockito.times((int)1))).read((byte[])Mockito.any(byte[].class));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="booleanFlagProvider")
    public void testConsumerWithPooledMessages(boolean isBatchingEnabled) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).build();
        try {
            String topic = "persistent://my-property/my-ns/testConsumerWithPooledMessages" + isBatchingEnabled;
            Consumer consumer = newPulsarClient.newConsumer(Schema.BYTEBUFFER).topic(new String[]{topic}).subscriptionName("my-sub").poolMessages(true).subscribe();
            try {
                Producer producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled).create();
                try {
                    int numMessages = 100;
                    for (int i = 0; i < 100; ++i) {
                        producer.newMessage().value((Object)("value-" + i).getBytes(StandardCharsets.UTF_8)).eventTime((long)(i + 1) * 100L).sendAsync();
                    }
                    producer.flush();
                    byte[] val = null;
                    int size = 0;
                    for (int i = 0; i < 100; ++i) {
                        Message msg = consumer.receive();
                        try {
                            ByteBuffer value = (ByteBuffer)msg.getValue();
                            int capacity = value.remaining();
                            if (capacity > size) {
                                val = new byte[capacity];
                                size = capacity;
                            }
                            value.get(val, 0, capacity);
                            Assert.assertEquals((String)("value-" + i), (String)new String(val, 0, capacity));
                            continue;
                        }
                        finally {
                            msg.release();
                        }
                    }
                    consumer.close();
                    producer.close();
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="booleanFlagProvider")
    public void testPooledMessageWithAckTimeout(boolean isBatchingEnabled) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).build();
        try {
            String topic = "persistent://my-property/my-ns/testPooledMessageWithAckTimeout" + isBatchingEnabled;
            ConsumerImpl consumer = (ConsumerImpl)newPulsarClient.newConsumer(Schema.BYTEBUFFER).topic(new String[]{topic}).subscriptionName("my-sub").poolMessages(true).subscribe();
            try {
                Producer producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled).create();
                try {
                    int numMessages = 100;
                    for (int i = 0; i < 100; ++i) {
                        producer.newMessage().value((Object)("value-" + i).getBytes(StandardCharsets.UTF_8)).eventTime((long)(i + 1) * 100L).sendAsync();
                    }
                    producer.flush();
                    BrokerClientIntegrationTest.retryStrategically(test -> consumer.incomingMessages.peek() != null, 5, 500L);
                    MessageImpl msg = (MessageImpl)consumer.incomingMessages.peek();
                    Assert.assertNotNull((Object)msg);
                    ByteBuf payload = msg.getPayload();
                    Assert.assertNotEquals((Object)payload.refCnt(), (Object)0);
                    consumer.redeliverUnacknowledgedMessages();
                    Assert.assertEquals((int)payload.refCnt(), (int)0);
                    consumer.close();
                    producer.close();
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        }
    }

    private static final class TestMessageObject {
        private String value;

        private TestMessageObject() {
        }

        public String getValue() {
            return this.value;
        }

        public void setValue(String value) {
            this.value = value;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof TestMessageObject)) {
                return false;
            }
            TestMessageObject other = (TestMessageObject)o;
            String this$value = this.getValue();
            String other$value = other.getValue();
            return !(this$value == null ? other$value != null : !this$value.equals(other$value));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $value = this.getValue();
            result = result * 59 + ($value == null ? 43 : $value.hashCode());
            return result;
        }
    }

    static class TimestampEntryCount {
        private final long timestamp;
        private int numMessages = 0;

        public TimestampEntryCount(long ts) {
            this.timestamp = ts;
        }

        public int incrementAndGet() {
            return ++this.numMessages;
        }

        public long getTimestamp() {
            return this.timestamp;
        }
    }
}

