/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class ExclusiveProducerTest
extends BrokerTestBase {
    @Override
    @BeforeClass
    protected void setup() throws Exception {
        this.baseSetup();
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        this.internalCleanup();
    }

    @DataProvider(name="topics")
    public static Object[][] topics() {
        return new Object[][]{{"persistent", Boolean.TRUE}, {"persistent", Boolean.FALSE}, {"non-persistent", Boolean.TRUE}, {"non-persistent", Boolean.FALSE}};
    }

    @DataProvider(name="accessMode")
    public static Object[][] accessMode() {
        return new Object[][]{{ProducerAccessMode.Exclusive, Boolean.TRUE}, {ProducerAccessMode.Exclusive, Boolean.FALSE}, {ProducerAccessMode.WaitForExclusive, Boolean.TRUE}, {ProducerAccessMode.WaitForExclusive, Boolean.FALSE}};
    }

    @Test(dataProvider="topics")
    public void simpleTest(String type, boolean partitioned) throws Exception {
        String topic = this.newTopic(type, partitioned);
        this.simpleTest(topic);
    }

    private void simpleTest(String topic) throws Exception {
        Producer p1 = this.pulsarClient.newProducer(Schema.STRING).topic(topic).accessMode(ProducerAccessMode.Exclusive).create();
        try {
            this.pulsarClient.newProducer(Schema.STRING).topic(topic).accessMode(ProducerAccessMode.Exclusive).create();
            Assert.fail((String)"Should have failed");
        }
        catch (PulsarClientException.ProducerFencedException producerFencedException) {
            // empty catch block
        }
        try {
            this.pulsarClient.newProducer(Schema.STRING).topic(topic).accessMode(ProducerAccessMode.Shared).create();
            Assert.fail((String)"Should have failed");
        }
        catch (PulsarClientException.ProducerBusyException producerBusyException) {
            // empty catch block
        }
        p1.close();
        Producer p2 = this.pulsarClient.newProducer(Schema.STRING).topic(topic).accessMode(ProducerAccessMode.Exclusive).create();
        p2.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="topics")
    public void existingSharedProducer(String type, boolean partitioned) throws Exception {
        String topic = this.newTopic(type, partitioned);
        Producer p1 = this.pulsarClient.newProducer(Schema.STRING).topic(topic).accessMode(ProducerAccessMode.Shared).create();
        try {
            try {
                this.pulsarClient.newProducer(Schema.STRING).topic(topic).accessMode(ProducerAccessMode.Exclusive).create();
                Assert.fail((String)"Should have failed");
            }
            catch (PulsarClientException.ProducerFencedException producerFencedException) {
                // empty catch block
            }
        }
        finally {
            if (Collections.singletonList(p1).get(0) != null) {
                p1.close();
            }
        }
    }

    @Test(dataProvider="accessMode")
    public void producerReconnection(ProducerAccessMode accessMode, boolean partitioned) throws Exception {
        String topic = this.newTopic("persistent", partitioned);
        Producer p1 = this.pulsarClient.newProducer(Schema.STRING).topic(topic).accessMode(accessMode).create();
        p1.send((Object)"msg-1");
        this.admin.topics().unload(topic);
        p1.send((Object)"msg-2");
    }

    @Test(dataProvider="accessMode")
    public void producerFenced(ProducerAccessMode accessMode, boolean partitioned) throws Exception {
        String topic = this.newTopic("persistent", partitioned);
        Producer p1 = this.pulsarClient.newProducer(Schema.STRING).topic(topic).accessMode(accessMode).create();
        p1.send((Object)"msg-1");
        if (!partitioned) {
            Topic t = (Topic)((Optional)this.pulsar.getBrokerService().getTopic(topic, false).get()).get();
            CompletableFuture f = (CompletableFuture)Whitebox.getMethod(AbstractTopic.class, (String)"incrementTopicEpoch", (Class[])new Class[]{Optional.class}).invoke((Object)t, Optional.of(0L));
            f.get();
        } else {
            for (int i = 0; i < 3; ++i) {
                String name = TopicName.get((String)topic).getPartition(i).toString();
                Topic t = (Topic)((Optional)this.pulsar.getBrokerService().getTopic(name, false).get()).get();
                CompletableFuture f = (CompletableFuture)Whitebox.getMethod(AbstractTopic.class, (String)"incrementTopicEpoch", (Class[])new Class[]{Optional.class}).invoke((Object)t, Optional.of(0L));
                f.get();
            }
        }
        this.admin.topics().unload(topic);
        try {
            p1.send((Object)"msg-2");
            Assert.fail((String)"Should have failed");
        }
        catch (PulsarClientException.ProducerFencedException producerFencedException) {
            // empty catch block
        }
    }

    @Test(dataProvider="topics")
    public void topicDeleted(String ignored, boolean partitioned) throws Exception {
        String topic = this.newTopic("persistent", partitioned);
        Producer p1 = this.pulsarClient.newProducer(Schema.STRING).topic(topic).accessMode(ProducerAccessMode.Exclusive).create();
        p1.send((Object)"msg-1");
        if (partitioned) {
            this.admin.topics().deletePartitionedTopic(topic, true);
        } else {
            this.admin.topics().delete(topic, true);
        }
        p1.send((Object)"msg-2");
    }

    @Test(dataProvider="topics")
    public void waitForExclusiveTest(String type, boolean partitioned) throws Exception {
        String topic = this.newTopic(type, partitioned);
        Producer p1 = this.pulsarClient.newProducer(Schema.STRING).topic(topic).producerName("p1").accessMode(ProducerAccessMode.WaitForExclusive).create();
        CompletableFuture fp2 = this.pulsarClient.newProducer(Schema.STRING).topic(topic).producerName("p2").accessMode(ProducerAccessMode.WaitForExclusive).createAsync();
        Thread.sleep(1000L);
        CompletableFuture fp3 = this.pulsarClient.newProducer(Schema.STRING).topic(topic).producerName("p3").accessMode(ProducerAccessMode.WaitForExclusive).createAsync();
        Thread.sleep(1000L);
        Assert.assertFalse((boolean)fp2.isDone());
        Assert.assertFalse((boolean)fp3.isDone());
        p1.close();
        Producer p2 = (Producer)fp2.get(1L, TimeUnit.SECONDS);
        Assert.assertFalse((boolean)fp3.isDone());
        p2.close();
        Producer p3 = (Producer)fp3.get(1L, TimeUnit.SECONDS);
        p3.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="topics")
    public void waitForExclusiveWithClientTimeout(String type, boolean partitioned) throws Exception {
        String topic = this.newTopic(type, partitioned);
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).operationTimeout(1, TimeUnit.SECONDS).build();
        try {
            Producer p1 = client.newProducer(Schema.STRING).topic(topic).accessMode(ProducerAccessMode.WaitForExclusive).create();
            CompletableFuture fp2 = client.newProducer(Schema.STRING).topic(topic).accessMode(ProducerAccessMode.WaitForExclusive).createAsync();
            Thread.sleep(2000L);
            Assert.assertFalse((boolean)fp2.isDone());
            p1.close();
            fp2.get(1L, TimeUnit.SECONDS);
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @Test(dataProvider="topics")
    public void exclusiveWithConsumers(String type, boolean partitioned) throws Exception {
        String topic = this.newTopic(type, partitioned);
        this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName("test").subscribe();
        this.simpleTest(topic);
    }

    private String newTopic(String type, boolean isPartitioned) throws Exception {
        String topic = type + "://" + this.newTopicName();
        if (isPartitioned) {
            this.admin.topics().createPartitionedTopic(topic, 3);
        }
        return topic;
    }
}

