package org.apache.pulsar.client.impl;

import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/impl/ProducerConsumerInternalTest.class */
public class ProducerConsumerInternalTest extends ProducerConsumerBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass(alwaysRun = true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSameProducerRegisterTwice() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        ProducerImpl producerImpl = (ProducerImpl) this.pulsarClient.newProducer().topic(newUniqueName).create();
        removeServiceProducerMaintainedByServerCnx(getServiceProducer(producerImpl, newUniqueName));
        CommandCloseProducer commandCloseProducer = new CommandCloseProducer();
        commandCloseProducer.setProducerId(producerImpl.producerId);
        producerImpl.getClientCnx().handleCloseProducer(commandCloseProducer);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(producerImpl.getState().toString(), "Ready");
        });
    }

    @Test
    public void testSameProducerRegisterTwiceWithSpecifiedProducerName() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        ProducerImpl producerImpl = (ProducerImpl) this.pulsarClient.newProducer().producerName("p1").topic(newUniqueName).create();
        removeServiceProducerMaintainedByServerCnx(getServiceProducer(producerImpl, newUniqueName));
        CommandCloseProducer commandCloseProducer = new CommandCloseProducer();
        commandCloseProducer.setProducerId(producerImpl.producerId);
        producerImpl.getClientCnx().handleCloseProducer(commandCloseProducer);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(producerImpl.getState().toString(), "Ready", "The producer registration failed");
        });
    }

    private void removeServiceProducerMaintainedByServerCnx(MockedPulsarServiceBaseTest.ServiceProducer serviceProducer) {
        ServerCnx cnx = serviceProducer.getServiceProducer().getCnx();
        cnx.removedProducer(serviceProducer.getServiceProducer());
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(cnx.getProducers().containsKey(serviceProducer.getServiceProducer().getProducerId()));
        });
    }

    @Test
    public void testExclusiveConsumerWillAlwaysRetryEvenIfReceivedConsumerBusyError() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{newUniqueName.toString()}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("subscription1").subscribe();
        ClientCnx clientCnx = subscribe.getClientCnx();
        ServerCnx cnx = ((Consumer) ((Topic) ((Optional) this.pulsar.getBrokerService().getTopic(newUniqueName, false).join()).get()).getSubscription("subscription1").getDispatcher().getConsumers().get(0)).cnx();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        cnx.execute(() -> {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        clientCnx.close();
        Thread.sleep(1000L);
        countDownLatch.countDown();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(subscribe.getState(), HandlerState.State.Ready);
        });
        subscribe.close();
        this.admin.topics().delete(newUniqueName, false);
    }
}
