/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BkEnsemblesTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class BusyWaitServiceTest
extends BkEnsemblesTestBase {
    public BusyWaitServiceTest() {
        super(1);
    }

    @Override
    protected void configurePulsar(ServiceConfiguration config) {
        config.setEnableBusyWait(true);
        config.setManagedLedgerDefaultEnsembleSize(1);
        config.setManagedLedgerDefaultWriteQuorum(1);
        config.setManagedLedgerDefaultAckQuorum(1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPublishWithBusyWait() throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsar.getWebServiceAddress()).statsInterval(0L, TimeUnit.SECONDS).enableBusyWait(true).build();
        try {
            String namespace = "prop/busy-wait";
            this.admin.namespaces().createNamespace(namespace);
            String topic = namespace + "/my-topic-" + UUID.randomUUID();
            Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName("test").subscribe();
            try {
                Producer producer = client.newProducer(Schema.STRING).topic(topic).create();
                try {
                    int i;
                    for (i = 0; i < 10; ++i) {
                        producer.send((Object)("my-message-" + i));
                    }
                    for (i = 0; i < 10; ++i) {
                        Message msg = consumer.receive();
                        Assert.assertNotNull((Object)msg);
                        Assert.assertEquals((String)((String)msg.getValue()), (String)("my-message-" + i));
                        consumer.acknowledge(msg);
                    }
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }
}

