/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.api.BookKeeper;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ListLedgersResult;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BkEnsemblesTestBase;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class OpportunisticStripingTest
extends BkEnsemblesTestBase {
    public OpportunisticStripingTest() {
        super(2);
    }

    @Override
    protected void configurePulsar(ServiceConfiguration config) {
        config.setManagedLedgerDefaultEnsembleSize(5);
        config.setManagedLedgerDefaultWriteQuorum(2);
        config.setManagedLedgerDefaultAckQuorum(2);
        config.setBrokerDeleteInactiveTopicsEnabled(false);
        config.getProperties().setProperty("bookkeeper_opportunisticStriping", "true");
    }

    @Test
    public void testOpportunisticStriping() throws Exception {
        try (PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsar.getWebServiceAddress()).statsInterval(0L, TimeUnit.SECONDS).build();){
            String ns1 = "prop/usc/opportunistic1";
            this.admin.namespaces().createNamespace("prop/usc/opportunistic1");
            String topic1 = "persistent://prop/usc/opportunistic1/my-topic";
            Producer producer = client.newProducer().topic("persistent://prop/usc/opportunistic1/my-topic").create();
            for (int i = 0; i < 10; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            ClientConfiguration clientConfiguration = new ClientConfiguration();
            clientConfiguration.setZkServers("localhost:" + this.bkEnsemble.getZookeeperPort());
            try (BookKeeper bkAdmin = BookKeeper.newBuilder((ClientConfiguration)clientConfiguration).build();
                 ListLedgersResult list = (ListLedgersResult)bkAdmin.newListLedgersOp().execute().get();){
                int count = 0;
                Iterator iterator = list.toIterable().iterator();
                while (iterator.hasNext()) {
                    long ledgerId = (Long)iterator.next();
                    LedgerMetadata ledgerMetadata = (LedgerMetadata)bkAdmin.getLedgerMetadata(ledgerId).get();
                    AssertJUnit.assertEquals((int)2, (int)ledgerMetadata.getEnsembleSize());
                    AssertJUnit.assertEquals((int)2, (int)ledgerMetadata.getWriteQuorumSize());
                    AssertJUnit.assertEquals((int)2, (int)ledgerMetadata.getAckQuorumSize());
                    ++count;
                }
                AssertJUnit.assertTrue((count > 0 ? 1 : 0) != 0);
            }
        }
    }
}

