/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BkEnsemblesTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.zookeeper.ZooKeeper;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class BrokerBkEnsemblesTests
extends BkEnsemblesTestBase {
    public BrokerBkEnsemblesTests() {
        this(3);
    }

    public BrokerBkEnsemblesTests(int numberOfBookies) {
        super(numberOfBookies);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
        ZooKeeper zk = this.bkEnsemble.getZkClient();
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsar.getWebServiceAddress()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            int i;
            String ns1 = "prop/usc/crash-broker";
            this.admin.namespaces().createNamespace("prop/usc/crash-broker");
            String topic1 = "persistent://prop/usc/crash-broker/my-topic";
            Consumer consumer = client.newConsumer().topic(new String[]{"persistent://prop/usc/crash-broker/my-topic"}).subscriptionName("my-subscriber-name").subscribe();
            Producer producer = client.newProducer().topic("persistent://prop/usc/crash-broker/my-topic").create();
            for (int i2 = 0; i2 < 10; ++i2) {
                String message = "my-message-" + i2;
                producer.send((Object)message.getBytes());
            }
            Message msg = null;
            for (int i3 = 0; i3 < 10; ++i3) {
                msg = consumer.receive(1, TimeUnit.SECONDS);
                consumer.acknowledge(msg);
            }
            PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://prop/usc/crash-broker/my-topic").get();
            ManagedCursorImpl cursor = (ManagedCursorImpl)topic.getManagedLedger().getCursors().iterator().next();
            MockedPulsarServiceBaseTest.retryStrategically(test -> cursor.getState().equals("Open"), 5, 100L);
            long cursorLedgerId = cursor.getCursorLedger();
            String ledgerPath = "/ledgers" + StringUtils.getHybridHierarchicalLedgerPath((long)cursorLedgerId);
            Assert.assertNotNull((Object)zk.exists(ledgerPath, false));
            consumer.close();
            producer.close();
            this.pulsar.getBrokerService().removeTopicFromCache("persistent://prop/usc/crash-broker/my-topic");
            ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl)this.pulsar.getManagedLedgerFactory();
            Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
            field.setAccessible(true);
            ConcurrentHashMap ledgers = (ConcurrentHashMap)field.get(factory);
            ledgers.clear();
            consumer = client.newConsumer().topic(new String[]{"persistent://prop/usc/crash-broker/my-topic"}).subscriptionName("my-subscriber-name").subscribe();
            producer = client.newProducer().topic("persistent://prop/usc/crash-broker/my-topic").create();
            for (i = 0; i < 10; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            for (i = 0; i < 10; ++i) {
                msg = consumer.receive(1, TimeUnit.SECONDS);
                consumer.acknowledge(msg);
            }
            topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://prop/usc/crash-broker/my-topic").get();
            ManagedCursorImpl cursor1 = (ManagedCursorImpl)topic.getManagedLedger().getCursors().iterator().next();
            MockedPulsarServiceBaseTest.retryStrategically(test -> cursor1.getState().equals("Open"), 5, 100L);
            long newCursorLedgerId = cursor1.getCursorLedger();
            Assert.assertNotEquals((Object)newCursorLedgerId, (Object)-1);
            Assert.assertNotEquals((Object)cursorLedgerId, (Object)newCursorLedgerId);
            Assert.assertNull((Object)zk.exists(ledgerPath, false));
            producer.close();
            consumer.close();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSkipCorruptDataLedger() throws Exception {
        this.admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "false");
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsar.getWebServiceAddress()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            String ns1 = "prop/usc/crash-broker";
            int totalMessages = 100;
            int totalDataLedgers = 5;
            int entriesPerLedger = 20;
            try {
                this.admin.namespaces().createNamespace("prop/usc/crash-broker");
            }
            catch (Exception exception) {
                // empty catch block
            }
            String topic1 = "persistent://prop/usc/crash-broker/my-topic-" + System.currentTimeMillis();
            Consumer consumer = client.newConsumer().topic(new String[]{topic1}).subscriptionName("my-subscriber-name").receiverQueueSize(5).subscribe();
            PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic(topic1).get();
            ManagedLedgerImpl ml = (ManagedLedgerImpl)topic.getManagedLedger();
            ManagedCursorImpl cursor = (ManagedCursorImpl)ml.getCursors().iterator().next();
            Field configField = ManagedCursorImpl.class.getDeclaredField("config");
            configField.setAccessible(true);
            ManagedLedgerConfig config = (ManagedLedgerConfig)configField.get(cursor);
            config.setMaxEntriesPerLedger(20);
            config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
            Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
            bookKeeperField.setAccessible(true);
            BookKeeper bookKeeper = (BookKeeper)bookKeeperField.get(ml);
            Producer producer = client.newProducer().topic(topic1).create();
            for (int i = 0; i < 100; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            Assert.assertNotNull((Object)consumer.receive(1, TimeUnit.SECONDS));
            consumer.close();
            NavigableMap ledgerInfo = ml.getLedgersInfo();
            Assert.assertEquals((int)ledgerInfo.size(), (int)5);
            Map.Entry lastLedger = ledgerInfo.lastEntry();
            ledgerInfo.entrySet().forEach(entry -> {
                if (!entry.equals(lastLedger)) {
                    Assert.assertEquals((long)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)entry.getValue()).getEntries(), (long)20L);
                    try {
                        bookKeeper.deleteLedger(((Long)entry.getKey()).longValue());
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            producer.close();
            this.pulsar.getBrokerService().removeTopicFromCache(topic1);
            ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl)this.pulsar.getManagedLedgerFactory();
            Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
            field.setAccessible(true);
            ConcurrentHashMap ledgers = (ConcurrentHashMap)field.get(factory);
            ledgers.clear();
            Message msg = null;
            consumer = client.newConsumer().topic(new String[]{topic1}).subscriptionName("my-subscriber-name").subscribe();
            msg = consumer.receive(1, TimeUnit.SECONDS);
            Assert.assertNull((Object)msg);
            consumer.close();
            this.admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "true");
            MockedPulsarServiceBaseTest.retryStrategically(test -> config.isAutoSkipNonRecoverableData(), 5, 100L);
            consumer = client.newConsumer().topic(new String[]{topic1}).subscriptionName("my-subscriber-name").subscribe();
            for (int i = 0; i < 20; ++i) {
                msg = consumer.receive();
                System.out.println(i);
                consumer.acknowledge(msg);
            }
            producer.close();
            consumer.close();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void testTopicWithWildCardChar() throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsar.getWebServiceAddress()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            String ns1 = "prop/usc/topicWithSpecialChar";
            try {
                this.admin.namespaces().createNamespace("prop/usc/topicWithSpecialChar");
            }
            catch (Exception exception) {
                // empty catch block
            }
            String topic1 = "persistent://prop/usc/topicWithSpecialChar/`~!@#$%^&*()-_+=[]://{}|\\;:'\"<>,./?-30e04524";
            String subName1 = "c1";
            byte[] content = "test".getBytes();
            Consumer consumer = client.newConsumer().topic(new String[]{"persistent://prop/usc/topicWithSpecialChar/`~!@#$%^&*()-_+=[]://{}|\\;:'\"<>,./?-30e04524"}).subscriptionName("c1").subscribe();
            Producer producer = client.newProducer().topic("persistent://prop/usc/topicWithSpecialChar/`~!@#$%^&*()-_+=[]://{}|\\;:'\"<>,./?-30e04524").create();
            producer.send((Object)content);
            Message msg = consumer.receive();
            Assert.assertEquals((byte[])msg.getData(), (byte[])content);
            consumer.close();
            producer.close();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDeleteTopicWithMissingData() throws Exception {
        String namespace = BrokerTestUtil.newUniqueName("prop/usc");
        this.admin.namespaces().createNamespace(namespace);
        String topic = BrokerTestUtil.newUniqueName(namespace + "/my-topic");
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName("test").subscribe();
            try {
                Producer producer = client.newProducer(Schema.STRING).topic(topic).create();
                try {
                    producer.send((Object)"Hello");
                    this.bkEnsemble.stopBK();
                    this.admin.topics().unload(topic);
                    Thread.sleep(1000L);
                    CompletableFuture future = this.pulsar.getBrokerService().getTopicIfExists(topic);
                    try {
                        future.get();
                        Assert.fail((String)"Should have thrown exception");
                    }
                    catch (ExecutionException executionException) {
                        // empty catch block
                    }
                    this.admin.topics().delete(topic);
                    Assert.assertEquals(this.pulsar.getBrokerService().getTopicIfExists(topic).join(), Optional.empty());
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }
}

