package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import java.net.URL;
import java.nio.channels.SelectionKey;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.apache.zookeeper.ClientCnxn;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.reflect.WhiteboxImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.class */
public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetrySupport {
    private static final Logger log = LoggerFactory.getLogger(CanReconnectZKClientPulsarServiceBaseTest.class);
    protected URL url;
    protected URL urlTls;
    protected ZookeeperServerTest brokerConfigZk;
    protected LocalBookkeeperEnsemble bkEnsemble;
    protected PulsarService pulsar;
    protected BrokerService broker;
    protected PulsarAdmin admin;
    protected PulsarClient client;
    protected ZooKeeper localZkOfBroker;
    protected Object localMetaDataStoreClientCnx;
    protected final String defaultTenant = "public";
    protected final String defaultNamespace = "public/default";
    protected int numberOfBookies = 3;
    protected final String clusterName = "r1";
    protected ServiceConfiguration config = new ServiceConfiguration();
    protected final AtomicBoolean LocalMetadataStoreInReconnectFinishSignal = new AtomicBoolean();

    protected void startZKAndBK() throws Exception {
        this.brokerConfigZk = new ZookeeperServerTest(0);
        this.brokerConfigZk.start();
        this.bkEnsemble = new LocalBookkeeperEnsemble(this.numberOfBookies, 0, () -> {
            return 0;
        });
        this.bkEnsemble.start();
    }

    protected void startBrokers() throws Exception {
        setConfigDefaults(this.config, "r1", this.bkEnsemble, this.brokerConfigZk);
        this.pulsar = new PulsarService(this.config);
        this.pulsar.start();
        this.broker = this.pulsar.getBrokerService();
        this.localZkOfBroker = this.pulsar.getLocalMetadataStore().getZkClient();
        this.localMetaDataStoreClientCnx = WhiteboxImpl.getInternalState(WhiteboxImpl.getInternalState((ClientCnxn) WhiteboxImpl.getInternalState(this.localZkOfBroker, "cnxn"), "sendThread"), "clientCnxnSocket");
        this.url = new URL(this.pulsar.getWebServiceAddress());
        this.urlTls = new URL(this.pulsar.getWebServiceAddressTls());
        this.admin = PulsarAdmin.builder().serviceHttpUrl(this.url.toString()).build();
        this.client = PulsarClient.builder().serviceUrl(this.url.toString()).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void makeLocalMetadataStoreKeepReconnect() throws Exception {
        if (!this.LocalMetadataStoreInReconnectFinishSignal.compareAndSet(false, true)) {
            throw new RuntimeException("Local metadata store is already keeping reconnect");
        }
        if (this.localMetaDataStoreClientCnx.getClass().getSimpleName().equals("ClientCnxnSocketNIO")) {
            makeLocalMetadataStoreKeepReconnectNIO();
        } else {
            makeLocalMetadataStoreKeepReconnectNetty();
        }
    }

    protected void makeLocalMetadataStoreKeepReconnectNIO() {
        new Thread(() -> {
            while (this.LocalMetadataStoreInReconnectFinishSignal.get()) {
                try {
                    SelectionKey selectionKey = (SelectionKey) WhiteboxImpl.getInternalState(this.localMetaDataStoreClientCnx, "sockKey");
                    if (selectionKey != null) {
                        selectionKey.channel().close();
                    }
                    Thread.sleep(5L);
                } catch (Exception e) {
                    log.error("Try close the ZK connection of local metadata store failed: {}", e.toString());
                }
            }
        }).start();
    }

    protected void makeLocalMetadataStoreKeepReconnectNetty() {
        new Thread(() -> {
            while (this.LocalMetadataStoreInReconnectFinishSignal.get()) {
                try {
                    Channel channel = (Channel) WhiteboxImpl.getInternalState(this.localMetaDataStoreClientCnx, "channel");
                    if (channel != null) {
                        channel.close();
                    }
                    Thread.sleep(5L);
                } catch (Exception e) {
                    log.error("Try close the ZK connection of local metadata store failed: {}", e.toString());
                }
            }
        }).start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stopLocalMetadataStoreAlwaysReconnect() {
        this.LocalMetadataStoreInReconnectFinishSignal.set(false);
    }

    protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
        this.admin.clusters().createCluster("r1", ClusterData.builder().serviceUrl(this.url.toString()).serviceUrlTls(this.urlTls.toString()).brokerServiceUrl(this.pulsar.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar.getBrokerServiceUrlTls()).brokerClientTlsEnabled(false).build());
        this.admin.tenants().createTenant("public", new TenantInfoImpl(Collections.emptySet(), Sets.newHashSet(new String[]{"r1"})));
        this.admin.namespaces().createNamespace("public/default", Sets.newHashSet(new String[]{"r1"}));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setup() throws Exception {
        incrementSetupNumber();
        log.info("--- Starting OneWayReplicatorTestBase::setup ---");
        startZKAndBK();
        startBrokers();
        createDefaultTenantsAndClustersAndNamespace();
        Thread.sleep(100L);
        log.info("--- OneWayReplicatorTestBase::setup completed ---");
    }

    private void setConfigDefaults(ServiceConfiguration serviceConfiguration, String str, LocalBookkeeperEnsemble localBookkeeperEnsemble, ZookeeperServerTest zookeeperServerTest) {
        serviceConfiguration.setClusterName(str);
        serviceConfiguration.setAdvertisedAddress("localhost");
        serviceConfiguration.setWebServicePort(Optional.of(0));
        serviceConfiguration.setWebServicePortTls(Optional.of(0));
        serviceConfiguration.setMetadataStoreUrl("zk:127.0.0.1:" + localBookkeeperEnsemble.getZookeeperPort());
        serviceConfiguration.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + zookeeperServerTest.getZookeeperPort() + "/foo");
        serviceConfiguration.setBrokerDeleteInactiveTopicsEnabled(false);
        serviceConfiguration.setBrokerDeleteInactiveTopicsFrequencySeconds(60);
        serviceConfiguration.setBrokerShutdownTimeoutMs(0L);
        serviceConfiguration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(Double.valueOf(1.0d)));
        serviceConfiguration.setBrokerServicePort(Optional.of(0));
        serviceConfiguration.setBrokerServicePortTls(Optional.of(0));
        serviceConfiguration.setBacklogQuotaCheckIntervalInSeconds(5);
        serviceConfiguration.setDefaultNumberOfNamespaceBundles(1);
        serviceConfiguration.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
        serviceConfiguration.setEnableReplicatedSubscriptions(true);
        serviceConfiguration.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cleanup() throws Exception {
        markCurrentSetupNumberCleaned();
        log.info("--- Shutting down ---");
        stopLocalMetadataStoreAlwaysReconnect();
        if (this.client != null) {
            this.client.close();
            this.client = null;
        }
        if (this.admin != null) {
            this.admin.close();
            this.admin = null;
        }
        if (this.pulsar != null) {
            this.pulsar.close();
            this.pulsar = null;
        }
        if (this.bkEnsemble != null) {
            this.bkEnsemble.stop();
            this.bkEnsemble = null;
        }
        if (this.brokerConfigZk != null) {
            this.brokerConfigZk.stop();
            this.brokerConfigZk = null;
        }
        this.config = new ServiceConfiguration();
    }
}
