/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.cluster;

import java.io.File;
import java.io.IOException;
import java.net.BindException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.base.Preconditions;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.Lists;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.admin.StorageAdminClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.exceptions.ClientException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.utils.NetUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.component.LifecycleComponent;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.cluster.StreamClusterSpec;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.NamespaceProperties;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.protocol.ProtocolConstants;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.server.StorageServer;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.LocalDLMEmulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamCluster
extends AbstractLifecycleComponent<StorageConfiguration> {
    private static final Logger log = LoggerFactory.getLogger(StreamCluster.class);
    private static final int MAX_RETRIES = 20;
    private final StreamClusterSpec spec;
    private final List<Endpoint> rpcEndpoints;
    private ServiceURI metadataServiceUri;
    private int zkPort;
    private ZooKeeperServerShim zks;
    private List<LifecycleComponent> servers;
    private int nextBookiePort;
    private int nextGrpcPort;

    public static StreamCluster build(StreamClusterSpec spec) {
        return new StreamCluster(spec);
    }

    private static ServerConfiguration newBookieConfiguration(ServiceURI serviceURI) {
        ServerConfiguration serverConf = new ServerConfiguration();
        serverConf.setMetadataServiceUri(serviceURI.getUri().toString());
        serverConf.setAllowLoopback(true);
        serverConf.setGcWaitTime(300000L);
        serverConf.setDiskUsageWarnThreshold(0.9999f);
        serverConf.setDiskUsageThreshold(0.999999f);
        return serverConf;
    }

    private StreamCluster(StreamClusterSpec spec) {
        super("stream-cluster", new StorageConfiguration(spec.baseConf()), (StatsLogger)NullStatsLogger.INSTANCE);
        this.spec = spec;
        this.servers = Lists.newArrayListWithExpectedSize(spec.numServers());
        this.rpcEndpoints = Lists.newArrayListWithExpectedSize(spec.numServers());
        this.nextBookiePort = spec.initialBookiePort();
        this.nextGrpcPort = spec.initialGrpcPort();
    }

    public List<Endpoint> getRpcEndpoints() {
        return this.rpcEndpoints;
    }

    private void startZooKeeper() throws Exception {
        if (!this.spec.shouldStartZooKeeper()) {
            this.metadataServiceUri = Preconditions.checkNotNull(this.spec.metadataServiceUri, "No metadata service uri is configured while configuring not to start zookeeper");
            return;
        }
        File zkDir = new File(this.spec.storageRootDir(), "zookeeper");
        Pair zkServerAndPort = LocalDLMEmulator.runZookeeperOnAnyPort((int)this.spec.zkPort(), (File)zkDir);
        this.zks = (ZooKeeperServerShim)zkServerAndPort.getLeft();
        this.zkPort = (Integer)zkServerAndPort.getRight();
        log.info("Started zookeeper at port {}.", (Object)this.zkPort);
        this.metadataServiceUri = ServiceURI.create("zk://127.0.0.1:" + this.zkPort + "/ledgers");
    }

    private void stopZooKeeper() {
        if (null != this.zks) {
            this.zks.stop();
        }
    }

    private void initializeCluster() throws Exception {
        Preconditions.checkArgument("zk".equals(this.metadataServiceUri.getServiceName()), "Only support zookeeper based metadata service now");
        Object[] serviceHosts = this.metadataServiceUri.getServiceHosts();
        String metadataServers = StringUtils.join(serviceHosts, ',');
        new ZkClusterInitializer(metadataServers).initializeCluster(this.metadataServiceUri.getUri(), this.spec.numServers() * 2);
        MetadataDrivers.runFunctionWithMetadataBookieDriver(StreamCluster.newBookieConfiguration(this.metadataServiceUri), driver -> {
            try {
                boolean initialized = driver.getRegistrationManager().initNewCluster();
                if (initialized) {
                    log.info("Successfully initialized the segment storage");
                } else {
                    log.info("The segment storage was already initialized");
                }
            }
            catch (Exception e) {
                throw new StorageRuntimeException("Failed to initialize the segment storage", e);
            }
            return null;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private LifecycleComponent startServer() throws Exception {
        boolean success = false;
        int retries = 0;
        while (!success) {
            StreamCluster streamCluster = this;
            synchronized (streamCluster) {
                ++this.nextBookiePort;
                ++this.nextGrpcPort;
            }
            LifecycleComponent server = null;
            try {
                int grpcPort;
                int bookiePort;
                ServerConfiguration serverConf = StreamCluster.newBookieConfiguration(this.metadataServiceUri);
                serverConf.loadConf(this.spec.baseConf());
                serverConf.setBookiePort(bookiePort);
                File bkDir = new File(this.spec.storageRootDir(), "bookie_" + bookiePort);
                serverConf.setJournalDirName(bkDir.getPath());
                serverConf.setLedgerDirNames(new String[]{bkDir.getPath()});
                File rangesStoreDir = new File(this.spec.storageRootDir(), "ranges_" + grpcPort);
                StorageConfiguration storageConf = new StorageConfiguration(serverConf);
                storageConf.setRangeStoreDirNames(new String[]{rangesStoreDir.getPath()});
                log.info("Attempting to start storage server at (bookie port = {}, grpc port = {}) : bkDir = {}, rangesStoreDir = {}", new Object[]{bookiePort, grpcPort, bkDir, rangesStoreDir});
                server = StorageServer.buildStorageServer(serverConf, grpcPort);
                server.start();
                log.info("Started storage server at (bookie port = {}, grpc port = {})", (Object)bookiePort, (Object)grpcPort);
                this.rpcEndpoints.add(StorageServer.createLocalEndpoint(grpcPort, false));
                return server;
            }
            catch (Throwable e) {
                log.error("Failed to start storage server", e);
                if (null != server) {
                    server.stop();
                }
                if (e.getCause() instanceof BindException) {
                    if (++retries <= 20) continue;
                    throw (BindException)e.getCause();
                }
                throw e;
            }
        }
        throw new IOException("Failed to start any storage server.");
    }

    private void startServers() throws Exception {
        log.info("Starting {} storage servers.", (Object)this.spec.numServers());
        ExecutorService executor = Executors.newCachedThreadPool();
        ArrayList<Future> startFutures = Lists.newArrayList();
        for (int i = 0; i < this.spec.numServers(); ++i) {
            Future future = executor.submit(() -> this.startServer());
            startFutures.add(future);
        }
        for (Future future : startFutures) {
            this.servers.add((LifecycleComponent)future.get());
        }
        log.info("Started {} storage servers.", (Object)this.spec.numServers());
        executor.shutdown();
    }

    private void createDefaultNamespaces() throws Exception {
        String serviceUri = String.format("bk://%s/", this.getRpcEndpoints().stream().map(endpoint -> NetUtils.endpointToString(endpoint)).collect(Collectors.joining(",")));
        StorageClientSettings settings = StorageClientSettings.newBuilder().serviceUri(serviceUri).usePlaintext(true).build();
        log.info("Service uri are : {}", (Object)serviceUri);
        String namespaceName = "default";
        try (StorageAdminClient admin = StorageClientBuilder.newBuilder().withSettings(settings).buildAdmin();){
            boolean created = false;
            while (!created) {
                try {
                    NamespaceProperties nsProps = FutureUtils.result(admin.getNamespace(namespaceName));
                    log.info("Namespace '{}':\n{}", (Object)namespaceName, (Object)nsProps);
                    created = true;
                }
                catch (NamespaceNotFoundException nnfe) {
                    log.info("Namespace '{}' is not found.", (Object)namespaceName);
                    log.info("Creating namespace '{}' ...", (Object)namespaceName);
                    try {
                        NamespaceProperties nsProps = FutureUtils.result(admin.createNamespace(namespaceName, NamespaceConfiguration.newBuilder().setDefaultStreamConf(ProtocolConstants.DEFAULT_STREAM_CONF).build()));
                        log.info("Successfully created namespace '{}':", (Object)namespaceName);
                        log.info("{}", (Object)nsProps);
                    }
                    catch (ClientException clientException) {}
                }
            }
        }
    }

    private void stopServers() {
        for (LifecycleComponent server : this.servers) {
            server.close();
        }
    }

    @Override
    protected void doStart() {
        try {
            this.startZooKeeper();
            this.initializeCluster();
            this.startServers();
            this.createDefaultNamespaces();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void doStop() {
    }

    @Override
    protected void doClose() throws IOException {
        this.stopServers();
        this.stopZooKeeper();
    }
}

