/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.net.URL;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;

public abstract class ReplicatorTestBase
extends TestRetrySupport {
    URL url1;
    URL urlTls1;
    ServiceConfiguration config1 = new ServiceConfiguration();
    PulsarService pulsar1;
    BrokerService ns1;
    PulsarAdmin admin1;
    LocalBookkeeperEnsemble bkEnsemble1;
    URL url2;
    URL urlTls2;
    ServiceConfiguration config2 = new ServiceConfiguration();
    PulsarService pulsar2;
    BrokerService ns2;
    PulsarAdmin admin2;
    LocalBookkeeperEnsemble bkEnsemble2;
    URL url3;
    URL urlTls3;
    ServiceConfiguration config3 = new ServiceConfiguration();
    PulsarService pulsar3;
    BrokerService ns3;
    PulsarAdmin admin3;
    LocalBookkeeperEnsemble bkEnsemble3;
    ZookeeperServerTest globalZkS;
    ExecutorService executor;
    static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
    protected static final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
    protected static final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
    private static final Logger log = LoggerFactory.getLogger(ReplicatorTestBase.class);

    public int getBrokerServicePurgeInactiveFrequency() {
        return 60;
    }

    public boolean isBrokerServicePurgeInactiveTopic() {
        return false;
    }

    protected void setup() throws Exception {
        this.incrementSetupNumber();
        log.info("--- Starting ReplicatorTestBase::setup ---");
        this.executor = new ThreadPoolExecutor(5, 20, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)new DefaultThreadFactory("ReplicatorTestBase"));
        this.globalZkS = new ZookeeperServerTest(0);
        this.globalZkS.start();
        this.bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
        this.bkEnsemble1.start();
        this.setConfig1DefaultValue();
        this.pulsar1 = new PulsarService(this.config1);
        this.pulsar1.start();
        this.ns1 = this.pulsar1.getBrokerService();
        this.url1 = new URL(this.pulsar1.getWebServiceAddress());
        this.urlTls1 = new URL(this.pulsar1.getWebServiceAddressTls());
        this.admin1 = PulsarAdmin.builder().serviceHttpUrl(this.url1.toString()).build();
        this.bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
        this.bkEnsemble2.start();
        this.setConfig2DefaultValue();
        this.pulsar2 = new PulsarService(this.config2);
        this.pulsar2.start();
        this.ns2 = this.pulsar2.getBrokerService();
        this.url2 = new URL(this.pulsar2.getWebServiceAddress());
        this.urlTls2 = new URL(this.pulsar2.getWebServiceAddressTls());
        this.admin2 = PulsarAdmin.builder().serviceHttpUrl(this.url2.toString()).build();
        this.bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
        this.bkEnsemble3.start();
        this.setConfig3DefaultValue();
        this.pulsar3 = new PulsarService(this.config3);
        this.pulsar3.start();
        this.ns3 = this.pulsar3.getBrokerService();
        this.url3 = new URL(this.pulsar3.getWebServiceAddress());
        this.urlTls3 = new URL(this.pulsar3.getWebServiceAddressTls());
        this.admin3 = PulsarAdmin.builder().serviceHttpUrl(this.url3.toString()).build();
        this.admin1.clusters().createCluster("r1", new ClusterData(this.url1.toString(), this.urlTls1.toString(), this.pulsar1.getSafeBrokerServiceUrl(), this.pulsar1.getBrokerServiceUrlTls()));
        this.admin1.clusters().createCluster("r2", new ClusterData(this.url2.toString(), this.urlTls2.toString(), this.pulsar2.getSafeBrokerServiceUrl(), this.pulsar2.getBrokerServiceUrlTls()));
        this.admin1.clusters().createCluster("r3", new ClusterData(this.url3.toString(), this.urlTls3.toString(), this.pulsar3.getSafeBrokerServiceUrl(), this.pulsar3.getBrokerServiceUrlTls()));
        this.admin1.tenants().createTenant("pulsar", new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2", "appid3"}), (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2", "r3"})));
        this.admin1.namespaces().createNamespace("pulsar/ns", (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2", "r3"}));
        this.admin1.namespaces().createNamespace("pulsar/ns1", (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2"}));
        Assert.assertEquals((String)this.admin2.clusters().getCluster("r1").getServiceUrl(), (String)this.url1.toString());
        Assert.assertEquals((String)this.admin2.clusters().getCluster("r2").getServiceUrl(), (String)this.url2.toString());
        Assert.assertEquals((String)this.admin2.clusters().getCluster("r3").getServiceUrl(), (String)this.url3.toString());
        Assert.assertEquals((String)this.admin2.clusters().getCluster("r1").getBrokerServiceUrl(), (String)this.pulsar1.getSafeBrokerServiceUrl());
        Assert.assertEquals((String)this.admin2.clusters().getCluster("r2").getBrokerServiceUrl(), (String)this.pulsar2.getSafeBrokerServiceUrl());
        Assert.assertEquals((String)this.admin2.clusters().getCluster("r3").getBrokerServiceUrl(), (String)this.pulsar3.getSafeBrokerServiceUrl());
        this.admin1.clusters().createCluster("global", new ClusterData("http://global:8080", "https://global:8443"));
        this.admin1.namespaces().createNamespace("pulsar/global/ns");
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2", "r3"}));
        Thread.sleep(100L);
        log.info("--- ReplicatorTestBase::setup completed ---");
    }

    public void setConfig3DefaultValue() {
        this.setConfigDefaults(this.config3, "r3", this.bkEnsemble3);
        this.config3.setTlsEnabled(true);
    }

    public void setConfig1DefaultValue() {
        this.setConfigDefaults(this.config1, "r1", this.bkEnsemble1);
    }

    public void setConfig2DefaultValue() {
        this.setConfigDefaults(this.config2, "r2", this.bkEnsemble2);
    }

    private void setConfigDefaults(ServiceConfiguration config, String clusterName, LocalBookkeeperEnsemble bookkeeperEnsemble) {
        config.setClusterName(clusterName);
        config.setAdvertisedAddress("localhost");
        config.setWebServicePort(Optional.of(0));
        config.setWebServicePortTls(Optional.of(0));
        config.setZookeeperServers("127.0.0.1:" + bookkeeperEnsemble.getZookeeperPort());
        config.setConfigurationStoreServers("127.0.0.1:" + this.globalZkS.getZookeeperPort() + "/foo");
        config.setBrokerDeleteInactiveTopicsEnabled(this.isBrokerServicePurgeInactiveTopic());
        config.setBrokerDeleteInactiveTopicsFrequencySeconds(this.inSec(this.getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
        config.setBrokerShutdownTimeoutMs(0L);
        config.setBrokerServicePort(Optional.of(0));
        config.setBrokerServicePortTls(Optional.of(0));
        config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
        config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
        config.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
        config.setBacklogQuotaCheckIntervalInSeconds(5);
        config.setDefaultNumberOfNamespaceBundles(1);
        config.setAllowAutoTopicCreationType("non-partitioned");
        config.setEnableReplicatedSubscriptions(true);
        config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
    }

    public void resetConfig1() {
        this.config1 = new ServiceConfiguration();
        this.setConfig1DefaultValue();
    }

    public void resetConfig2() {
        this.config2 = new ServiceConfiguration();
        this.setConfig2DefaultValue();
    }

    public void resetConfig3() {
        this.config3 = new ServiceConfiguration();
        this.setConfig3DefaultValue();
    }

    private int inSec(int time, TimeUnit unit) {
        return (int)TimeUnit.SECONDS.convert(time, unit);
    }

    protected void cleanup() throws Exception {
        this.markCurrentSetupNumberCleaned();
        log.info("--- Shutting down ---");
        if (this.executor != null) {
            this.executor.shutdownNow();
            this.executor = null;
        }
        this.admin1.close();
        this.admin2.close();
        this.admin3.close();
        if (this.pulsar3 != null) {
            this.pulsar3.close();
        }
        if (this.pulsar2 != null) {
            this.pulsar2.close();
        }
        if (this.pulsar1 != null) {
            this.pulsar1.close();
        }
        this.bkEnsemble1.stop();
        this.bkEnsemble2.stop();
        this.bkEnsemble3.stop();
        this.globalZkS.stop();
        this.resetConfig1();
        this.resetConfig2();
        this.resetConfig3();
    }

    static class MessageConsumer
    implements AutoCloseable {
        final URL url;
        final String namespace;
        final String topicName;
        final PulsarClient client;
        final Consumer<byte[]> consumer;

        MessageConsumer(URL url, TopicName dest) throws Exception {
            this(url, dest, "sub-id");
        }

        MessageConsumer(URL url, TopicName dest, String subId) throws Exception {
            this.url = url;
            this.namespace = dest.getNamespace();
            this.topicName = dest.toString();
            this.client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                this.consumer = this.client.newConsumer().topic(new String[]{this.topicName}).subscriptionName(subId).subscribe();
            }
            catch (Exception e) {
                this.client.close();
                throw e;
            }
        }

        void receive(int messages) throws Exception {
            log.info("Start receiving messages");
            TreeSet<String> receivedMessages = new TreeSet<String>();
            int i = 0;
            while (i < messages) {
                Message msg = this.consumer.receive(10, TimeUnit.SECONDS);
                Assert.assertNotNull((Object)msg);
                this.consumer.acknowledge(msg);
                String msgData = new String(msg.getData());
                log.info("Received message {}", (Object)msgData);
                boolean added = receivedMessages.add(msgData);
                if (added) {
                    Assert.assertEquals((String)msgData, (String)("test-" + i));
                    ++i;
                    continue;
                }
                log.info("Ignoring duplicate {}", (Object)msgData);
            }
        }

        boolean drained() throws Exception {
            return this.consumer.receive(0, TimeUnit.MICROSECONDS) == null;
        }

        @Override
        public void close() {
            try {
                this.client.close();
            }
            catch (PulsarClientException e) {
                log.warn("Failed to close client", (Throwable)e);
            }
        }
    }

    static class MessageProducer
    implements AutoCloseable {
        URL url;
        String namespace;
        String topicName;
        PulsarClient client;
        Producer<byte[]> producer;

        MessageProducer(URL url, TopicName dest) throws Exception {
            this.url = url;
            this.namespace = dest.getNamespace();
            this.topicName = dest.toString();
            this.client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            this.producer = this.client.newProducer().topic(this.topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        }

        MessageProducer(URL url, TopicName dest, boolean batch) throws Exception {
            this.url = url;
            this.namespace = dest.getNamespace();
            this.topicName = dest.toString();
            this.client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            ProducerBuilder producerBuilder = this.client.newProducer().topic(this.topicName).enableBatching(batch).batchingMaxPublishDelay(1L, TimeUnit.SECONDS).batchingMaxMessages(5);
            this.producer = producerBuilder.create();
        }

        void produceBatch(int messages) throws Exception {
            log.info("Start sending batch messages");
            for (int i = 0; i < messages; ++i) {
                this.producer.sendAsync((Object)("test-" + i).getBytes());
                log.info("queued message {}", (Object)("test-" + i));
            }
            this.producer.flush();
        }

        void produce(int messages) throws Exception {
            log.info("Start sending messages");
            for (int i = 0; i < messages; ++i) {
                this.producer.send((Object)("test-" + i).getBytes());
                log.info("Sent message {}", (Object)("test-" + i));
            }
        }

        TypedMessageBuilder<byte[]> newMessage() {
            return this.producer.newMessage();
        }

        void produce(int messages, TypedMessageBuilder<byte[]> messageBuilder) throws Exception {
            log.info("Start sending messages");
            for (int i = 0; i < messages; ++i) {
                String m = "test-" + i;
                messageBuilder.value((Object)m.getBytes()).send();
                log.info("Sent message {}", (Object)m);
            }
        }

        @Override
        public void close() {
            try {
                this.client.close();
            }
            catch (PulsarClientException e) {
                log.warn("Failed to close client", (Throwable)e);
            }
        }
    }
}

