package org.apache.pulsar.broker.service;

import io.netty.util.HashedWheelTimer;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/ExclusiveProducerTest.class */
public class ExclusiveProducerTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "topics")
    public static Object[][] topics() {
        return new Object[]{new Object[]{"persistent", Boolean.TRUE}, new Object[]{"persistent", Boolean.FALSE}, new Object[]{"non-persistent", Boolean.TRUE}, new Object[]{"non-persistent", Boolean.FALSE}};
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "accessMode")
    public static Object[][] accessMode() {
        return new Object[]{new Object[]{ProducerAccessMode.Exclusive, Boolean.TRUE}, new Object[]{ProducerAccessMode.Exclusive, Boolean.FALSE}, new Object[]{ProducerAccessMode.WaitForExclusive, Boolean.TRUE}, new Object[]{ProducerAccessMode.WaitForExclusive, Boolean.FALSE}};
    }

    @Test(dataProvider = "topics")
    public void simpleTest(String str, boolean z) throws Exception {
        simpleTest(newTopic(str, z));
    }

    private void simpleTest(String str) throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).accessMode(ProducerAccessMode.Exclusive).create();
        try {
            this.pulsarClient.newProducer(Schema.STRING).topic(str).accessMode(ProducerAccessMode.Exclusive).create();
            Assert.fail("Should have failed");
        } catch (PulsarClientException.ProducerFencedException e) {
        }
        try {
            this.pulsarClient.newProducer(Schema.STRING).topic(str).accessMode(ProducerAccessMode.Shared).create();
            Assert.fail("Should have failed");
        } catch (PulsarClientException.ProducerBusyException e2) {
        }
        create.close();
        this.pulsarClient.newProducer(Schema.STRING).topic(str).accessMode(ProducerAccessMode.Exclusive).create().close();
    }

    @Test(dataProvider = "topics")
    public void testProducerTasksCleanupWhenUsingExclusiveProducers(String str, boolean z) throws Exception {
        String newTopic = newTopic(str, z);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newTopic).accessMode(ProducerAccessMode.Exclusive).create();
        try {
            this.pulsarClient.newProducer(Schema.STRING).topic(newTopic).accessMode(ProducerAccessMode.Exclusive).create();
            Assert.fail("Should have failed");
        } catch (PulsarClientException.ProducerFencedException e) {
        }
        create.close();
        HashedWheelTimer timer = this.pulsarClient.timer();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(timer.pendingTimeouts(), 0L);
        });
    }

    @Test(dataProvider = "topics")
    public void existingSharedProducer(String str, boolean z) throws Exception {
        String newTopic = newTopic(str, z);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newTopic).accessMode(ProducerAccessMode.Shared).create();
        try {
            this.pulsarClient.newProducer(Schema.STRING).topic(newTopic).accessMode(ProducerAccessMode.Exclusive).create();
            Assert.fail("Should have failed");
        } catch (PulsarClientException.ProducerFencedException e) {
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
        if (Collections.singletonList(create).get(0) != null) {
            create.close();
        }
    }

    @Test(dataProvider = "accessMode")
    public void producerReconnection(ProducerAccessMode producerAccessMode, boolean z) throws Exception {
        String newTopic = newTopic("persistent", z);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newTopic).accessMode(producerAccessMode).create();
        create.send("msg-1");
        this.admin.topics().unload(newTopic);
        create.send("msg-2");
    }

    @Test(dataProvider = "accessMode")
    public void producerFenced(ProducerAccessMode producerAccessMode, boolean z) throws Exception {
        String newTopic = newTopic("persistent", z);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newTopic).accessMode(producerAccessMode).create();
        create.send("msg-1");
        if (z) {
            for (int i = 0; i < 3; i++) {
                ((CompletableFuture) Whitebox.getMethod(AbstractTopic.class, "incrementTopicEpoch", new Class[]{Optional.class}).invoke((Topic) ((Optional) this.pulsar.getBrokerService().getTopic(TopicName.get(newTopic).getPartition(i).toString(), false).get()).get(), Optional.of(0L))).get();
            }
        } else {
            ((CompletableFuture) Whitebox.getMethod(AbstractTopic.class, "incrementTopicEpoch", new Class[]{Optional.class}).invoke((Topic) ((Optional) this.pulsar.getBrokerService().getTopic(newTopic, false).get()).get(), Optional.of(0L))).get();
        }
        this.admin.topics().unload(newTopic);
        try {
            create.send("msg-2");
            Assert.fail("Should have failed");
        } catch (PulsarClientException.ProducerFencedException e) {
        }
    }

    @Test(dataProvider = "topics")
    public void topicDeleted(String str, boolean z) throws Exception {
        String newTopic = newTopic("persistent", z);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newTopic).accessMode(ProducerAccessMode.Exclusive).create();
        create.send("msg-1");
        if (z) {
            this.admin.topics().deletePartitionedTopic(newTopic, true);
        } else {
            this.admin.topics().delete(newTopic, true);
        }
        create.send("msg-2");
    }

    @Test(dataProvider = "topics")
    public void waitForExclusiveTest(String str, boolean z) throws Exception {
        String newTopic = newTopic(str, z);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newTopic).producerName("p1").accessMode(ProducerAccessMode.WaitForExclusive).create();
        CompletableFuture createAsync = this.pulsarClient.newProducer(Schema.STRING).topic(newTopic).producerName("p2").accessMode(ProducerAccessMode.WaitForExclusive).createAsync();
        Thread.sleep(1000L);
        CompletableFuture createAsync2 = this.pulsarClient.newProducer(Schema.STRING).topic(newTopic).producerName("p3").accessMode(ProducerAccessMode.WaitForExclusive).createAsync();
        Thread.sleep(1000L);
        Assert.assertFalse(createAsync.isDone());
        Assert.assertFalse(createAsync2.isDone());
        create.close();
        Producer producer = (Producer) createAsync.get(1L, TimeUnit.SECONDS);
        Assert.assertFalse(createAsync2.isDone());
        producer.close();
        ((Producer) createAsync2.get(1L, TimeUnit.SECONDS)).close();
    }

    @Test(dataProvider = "topics")
    public void waitForExclusiveWithClientTimeout(String str, boolean z) throws Exception {
        String newTopic = newTopic(str, z);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).operationTimeout(1, TimeUnit.SECONDS).build();
        try {
            Producer create = build.newProducer(Schema.STRING).topic(newTopic).accessMode(ProducerAccessMode.WaitForExclusive).create();
            CompletableFuture createAsync = build.newProducer(Schema.STRING).topic(newTopic).accessMode(ProducerAccessMode.WaitForExclusive).createAsync();
            Thread.sleep(2000L);
            Assert.assertFalse(createAsync.isDone());
            create.close();
            createAsync.get(1L, TimeUnit.SECONDS);
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test(dataProvider = "topics")
    public void exclusiveWithConsumers(String str, boolean z) throws Exception {
        String newTopic = newTopic(str, z);
        this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{newTopic}).subscriptionName("test").subscribe();
        simpleTest(newTopic);
    }

    private String newTopic(String str, boolean z) throws Exception {
        String str2 = str + "://" + newTopicName();
        if (z) {
            this.admin.topics().createPartitionedTopic(str2, 3);
        }
        return str2;
    }
}
