package com.twitter.distributedlog.service;

import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.config.DynamicConfigurationFactory;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.service.announcer.Announcer;
import com.twitter.distributedlog.service.announcer.NOPAnnouncer;
import com.twitter.distributedlog.service.announcer.ServerSetAnnouncer;
import com.twitter.distributedlog.service.config.DefaultStreamConfigProvider;
import com.twitter.distributedlog.service.config.NullStreamConfigProvider;
import com.twitter.distributedlog.service.config.ServerConfiguration;
import com.twitter.distributedlog.service.config.ServiceStreamConfigProvider;
import com.twitter.distributedlog.service.config.StreamConfigProvider;
import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
import com.twitter.distributedlog.thrift.service.DistributedLogService;
import com.twitter.distributedlog.util.ConfUtils;
import com.twitter.distributedlog.util.SchedulerUtils;
import com.twitter.finagle.Stack;
import com.twitter.finagle.ThriftMuxServer$;
import com.twitter.finagle.builder.Server;
import com.twitter.finagle.builder.ServerBuilder;
import com.twitter.finagle.stats.NullStatsReceiver;
import com.twitter.finagle.stats.StatsReceiver;
import com.twitter.finagle.thrift.ClientIdRequiredFilter;
import com.twitter.finagle.thrift.ThriftServerFramedCodec;
import com.twitter.finagle.transport.Transport;
import com.twitter.util.Duration;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;

/* loaded from: input_file:com/twitter/distributedlog/service/DistributedLogServer.class */
public class DistributedLogServer {
    static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class);
    private StatsProvider statsProvider;
    private ScheduledExecutorService configExecutorService;
    private final StatsReceiver statsReceiver;
    private final Optional<String> uri;
    private final Optional<String> conf;
    private final Optional<String> streamConf;
    private final Optional<Integer> port;
    private final Optional<Integer> statsPort;
    private final Optional<Integer> shardId;
    private final Optional<Boolean> announceServerSet;
    private final Optional<Boolean> thriftmux;
    private DistributedLogServiceImpl dlService = null;
    private Server server = null;
    private Announcer announcer = null;
    private long gracefulShutdownMs = 0;
    private final CountDownLatch keepAliveLatch = new CountDownLatch(1);

    DistributedLogServer(Optional<String> optional, Optional<String> optional2, Optional<String> optional3, Optional<Integer> optional4, Optional<Integer> optional5, Optional<Integer> optional6, Optional<Boolean> optional7, Optional<Boolean> optional8, StatsReceiver statsReceiver, StatsProvider statsProvider) {
        this.uri = optional;
        this.conf = optional2;
        this.streamConf = optional3;
        this.port = optional4;
        this.statsPort = optional5;
        this.shardId = optional6;
        this.announceServerSet = optional7;
        this.thriftmux = optional8;
        this.statsReceiver = statsReceiver;
        this.statsProvider = statsProvider;
    }

    public void runServer() throws ConfigurationException, IllegalArgumentException, IOException {
        StreamPartitionConverter identityStreamPartitionConverter;
        if (!this.uri.isPresent()) {
            throw new IllegalArgumentException("No distributedlog uri provided.");
        }
        URI create = URI.create((String) this.uri.get());
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        if (this.conf.isPresent()) {
            String str = (String) this.conf.get();
            try {
                distributedLogConfiguration.loadConf(new File(str).toURI().toURL());
            } catch (MalformedURLException e) {
                throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed " + str + ".");
            } catch (ConfigurationException e2) {
                throw new IllegalArgumentException("Failed to load distributedlog configuration from " + str + ".");
            }
        }
        this.configExecutorService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("DistributedLogService-Dyncfg-%d").setDaemon(true).build());
        ServerConfiguration serverConfiguration = new ServerConfiguration();
        serverConfiguration.loadConf(distributedLogConfiguration);
        if (this.shardId.isPresent()) {
            serverConfiguration.setServerShardId(((Integer) this.shardId.get()).intValue());
        }
        serverConfiguration.validate();
        DynamicDistributedLogConfiguration serviceDynConf = getServiceDynConf(distributedLogConfiguration);
        logger.info("Starting stats provider : {}", this.statsProvider.getClass());
        this.statsProvider.start(distributedLogConfiguration);
        if (this.announceServerSet.isPresent() && ((Boolean) this.announceServerSet.get()).booleanValue()) {
            this.announcer = new ServerSetAnnouncer(create, ((Integer) this.port.or(0)).intValue(), ((Integer) this.statsPort.or(0)).intValue(), ((Integer) this.shardId.or(0)).intValue());
        } else {
            this.announcer = new NOPAnnouncer();
        }
        try {
            identityStreamPartitionConverter = (StreamPartitionConverter) ReflectionUtils.newInstance(serverConfiguration.getStreamPartitionConverterClass());
        } catch (ConfigurationException e3) {
            logger.warn("Failed to load configured stream-to-partition converter. Fallback to use {}", IdentityStreamPartitionConverter.class.getName());
            identityStreamPartitionConverter = new IdentityStreamPartitionConverter();
        }
        StreamConfigProvider streamConfigProvider = getStreamConfigProvider(distributedLogConfiguration, identityStreamPartitionConverter);
        preRun(distributedLogConfiguration, serverConfiguration);
        Pair<DistributedLogServiceImpl, Server> runServer = runServer(serverConfiguration, distributedLogConfiguration, serviceDynConf, create, identityStreamPartitionConverter, this.statsProvider, ((Integer) this.port.or(0)).intValue(), this.keepAliveLatch, this.statsReceiver, this.thriftmux.isPresent(), streamConfigProvider);
        this.dlService = (DistributedLogServiceImpl) runServer.getLeft();
        this.server = (Server) runServer.getRight();
        this.announcer.announce();
    }

    protected void preRun(DistributedLogConfiguration distributedLogConfiguration, ServerConfiguration serverConfiguration) {
        this.gracefulShutdownMs = serverConfiguration.getGracefulShutdownPeriodMs();
        if (serverConfiguration.isDurableWriteEnabled()) {
            return;
        }
        distributedLogConfiguration.setDurableWriteEnabled(false);
    }

    private DynamicDistributedLogConfiguration getServiceDynConf(DistributedLogConfiguration distributedLogConfiguration) throws ConfigurationException {
        Optional absent = Optional.absent();
        if (this.conf.isPresent()) {
            absent = new DynamicConfigurationFactory(this.configExecutorService, distributedLogConfiguration.getDynamicConfigReloadIntervalSec(), TimeUnit.SECONDS).getDynamicConfiguration((String) this.conf.get());
        }
        return absent.isPresent() ? (DynamicDistributedLogConfiguration) absent.get() : ConfUtils.getConstDynConf(distributedLogConfiguration);
    }

    private StreamConfigProvider getStreamConfigProvider(DistributedLogConfiguration distributedLogConfiguration, StreamPartitionConverter streamPartitionConverter) throws ConfigurationException {
        StreamConfigProvider nullStreamConfigProvider = new NullStreamConfigProvider();
        if (this.streamConf.isPresent() && this.conf.isPresent()) {
            nullStreamConfigProvider = new ServiceStreamConfigProvider((String) this.streamConf.get(), (String) this.conf.get(), streamPartitionConverter, this.configExecutorService, distributedLogConfiguration.getDynamicConfigReloadIntervalSec(), TimeUnit.SECONDS);
        } else if (this.conf.isPresent()) {
            nullStreamConfigProvider = new DefaultStreamConfigProvider((String) this.conf.get(), this.configExecutorService, distributedLogConfiguration.getDynamicConfigReloadIntervalSec(), TimeUnit.SECONDS);
        }
        return nullStreamConfigProvider;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Pair<DistributedLogServiceImpl, Server> runServer(ServerConfiguration serverConfiguration, DistributedLogConfiguration distributedLogConfiguration, URI uri, StreamPartitionConverter streamPartitionConverter, StatsProvider statsProvider, int i) throws IOException {
        return runServer(serverConfiguration, distributedLogConfiguration, ConfUtils.getConstDynConf(distributedLogConfiguration), uri, streamPartitionConverter, statsProvider, i, new CountDownLatch(0), new NullStatsReceiver(), false, new NullStreamConfigProvider());
    }

    static Pair<DistributedLogServiceImpl, Server> runServer(ServerConfiguration serverConfiguration, DistributedLogConfiguration distributedLogConfiguration, DynamicDistributedLogConfiguration dynamicDistributedLogConfiguration, URI uri, StreamPartitionConverter streamPartitionConverter, StatsProvider statsProvider, int i, CountDownLatch countDownLatch, StatsReceiver statsReceiver, boolean z, StreamConfigProvider streamConfigProvider) throws IOException {
        logger.info("Running server @ uri {}.", uri);
        DistributedLogServiceImpl distributedLogServiceImpl = new DistributedLogServiceImpl(serverConfiguration, distributedLogConfiguration, dynamicDistributedLogConfiguration, streamConfigProvider, uri, streamPartitionConverter, statsProvider.getStatsLogger(""), serverConfiguration.isPerStreamStatEnabled() ? statsProvider.getStatsLogger("stream") : NullStatsLogger.INSTANCE, countDownLatch);
        StatsReceiver scope = statsReceiver.scope("service");
        StatsLogger statsLogger = statsProvider.getStatsLogger("service");
        ServerBuilder bindTo = ServerBuilder.get().name("DistributedLogServer").codec(ThriftServerFramedCodec.get()).reportTo(statsReceiver).keepAlive(true).bindTo(new InetSocketAddress(i));
        if (z) {
            logger.info("Using thriftmux.");
            Tuple2 mk = new Transport.Liveness(Duration.Top(), Duration.Top(), Option.apply(true)).mk();
            bindTo = bindTo.stack(ThriftMuxServer$.MODULE$.configured(mk._1(), (Stack.Param) mk._2()));
        }
        logger.info("DistributedLogServer running with the following configuration : \n{}", distributedLogConfiguration.getPropsAsString());
        Server safeBuild = ServerBuilder.safeBuild(new ClientIdRequiredFilter(scope).andThen(new StatsFilter(statsLogger).andThen(new DistributedLogService.Service(distributedLogServiceImpl, new TBinaryProtocol.Factory()))), bindTo);
        logger.info("Started DistributedLog Server.");
        return Pair.of(distributedLogServiceImpl, safeBuild);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void closeServer(Pair<DistributedLogServiceImpl, Server> pair, long j, TimeUnit timeUnit) {
        if (null != pair.getLeft()) {
            ((DistributedLogServiceImpl) pair.getLeft()).shutdown();
            if (j > 0) {
                try {
                    timeUnit.sleep(j);
                } catch (InterruptedException e) {
                    logger.info("Interrupted on waiting service shutting down state propagated to all clients : ", e);
                }
            }
        }
        if (null != pair.getRight()) {
            logger.info("Closing dl thrift server.");
            ((Server) pair.getRight()).close();
            logger.info("Closed dl thrift server.");
        }
    }

    public void close() {
        if (null != this.announcer) {
            try {
                this.announcer.unannounce();
            } catch (IOException e) {
                logger.warn("Error on unannouncing service : ", e);
            }
            this.announcer.close();
        }
        closeServer(Pair.of(this.dlService, this.server), this.gracefulShutdownMs, TimeUnit.MILLISECONDS);
        if (null != this.statsProvider) {
            this.statsProvider.stop();
        }
        SchedulerUtils.shutdownScheduler(this.configExecutorService, 60L, TimeUnit.SECONDS);
        this.keepAliveLatch.countDown();
    }

    public void join() throws InterruptedException {
        this.keepAliveLatch.await();
    }

    public static DistributedLogServer runServer(Optional<String> optional, Optional<String> optional2, Optional<String> optional3, Optional<Integer> optional4, Optional<Integer> optional5, Optional<Integer> optional6, Optional<Boolean> optional7, Optional<Boolean> optional8, StatsReceiver statsReceiver, StatsProvider statsProvider) throws ConfigurationException, IllegalArgumentException, IOException {
        DistributedLogServer distributedLogServer = new DistributedLogServer(optional, optional2, optional3, optional4, optional5, optional6, optional7, optional8, statsReceiver, statsProvider);
        distributedLogServer.runServer();
        return distributedLogServer;
    }
}
