package org.apache.pulsar.client.impl;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.class */
public class SimpleProduceConsumeIoTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(SimpleProduceConsumeIoTest.class);
    private PulsarClientImpl singleConnectionPerBrokerClient;

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass(alwaysRun = true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
        this.singleConnectionPerBrokerClient = PulsarClient.builder().connectionsPerBroker(1).serviceUrl(this.lookupUrl.toString()).build();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        if (this.singleConnectionPerBrokerClient != null) {
            this.singleConnectionPerBrokerClient.close();
        }
        super.internalCleanup();
    }

    @Test
    public void testUnstableNetWorkWhenCreatingProducer() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        ProducerImpl create = this.singleConnectionPerBrokerClient.newProducer().topic(newUniqueName).create();
        ClientCnx clientCnx = create.getClientCnx();
        create.close();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        new Thread(() -> {
            try {
                countDownLatch.await();
                clientCnx.ctx().close();
            } catch (Exception e) {
            }
        }).start();
        AtomicReference atomicReference = new AtomicReference();
        new Thread(() -> {
            ProducerConfigurationData producerConfigurationData = (ProducerConfigurationData) WhiteboxImpl.getInternalState(this.singleConnectionPerBrokerClient.newProducer().topic(newUniqueName), "conf");
            CompletableFuture completableFuture = new CompletableFuture();
            atomicReference.set(completableFuture);
            new ProducerImpl<byte[]>(this.singleConnectionPerBrokerClient, "public/default/tp1", producerConfigurationData, completableFuture, -1, Schema.BYTES, null, Optional.empty()) { // from class: org.apache.pulsar.client.impl.SimpleProduceConsumeIoTest.1
                public CompletableFuture<Void> connectionOpened(ClientCnx clientCnx2) {
                    countDownLatch.countDown();
                    try {
                        Thread.sleep(1500L);
                    } catch (InterruptedException e) {
                    }
                    return super.connectionOpened(clientCnx2);
                }
            };
        }).start();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(atomicReference.get());
            Assert.assertTrue(((CompletableFuture) atomicReference.get()).isDone());
        });
        ((CompletableFuture) atomicReference.get()).exceptionally(th -> {
            log.error("Failed to create producer", th);
            return null;
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(((CompletableFuture) atomicReference.get()).isCompletedExceptionally());
            Assert.assertTrue("Ready".equals(WhiteboxImpl.getInternalState(((CompletableFuture) atomicReference.get()).join(), "state").toString()));
        });
        ((Producer) ((CompletableFuture) atomicReference.get()).join()).close();
        this.admin.topics().delete(newUniqueName);
    }
}
