package io.reactivex.netty.server;

import io.netty.bootstrap.AbstractBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.util.concurrent.EventExecutorGroup;
import io.reactivex.netty.channel.ConnectionHandler;
import io.reactivex.netty.channel.UnpooledConnectionFactory;
import io.reactivex.netty.metrics.MetricEventsListener;
import io.reactivex.netty.metrics.MetricEventsPublisher;
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.pipeline.PipelineConfigurator;
import io.reactivex.netty.pipeline.PipelineConfiguratorComposite;
import io.reactivex.netty.server.AbstractServer;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Subscription;

/* loaded from: input_file:lib/rxnetty-0.4.11.jar:io/reactivex/netty/server/AbstractServer.class */
public class AbstractServer<I, O, B extends AbstractBootstrap<B, C>, C extends Channel, S extends AbstractServer> implements MetricEventsPublisher<ServerMetricsEvent<?>> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
    protected final UnpooledConnectionFactory<I, O> connectionFactory;
    protected final B bootstrap;
    protected final int port;
    protected final AtomicReference<ServerState> serverStateRef;
    protected final MetricEventsSubject<ServerMetricsEvent<?>> eventsSubject;
    protected ErrorHandler errorHandler;
    private ChannelFuture bindFuture;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:lib/rxnetty-0.4.11.jar:io/reactivex/netty/server/AbstractServer$ServerState.class */
    public enum ServerState {
        Created,
        Starting,
        Started,
        Shutdown
    }

    public AbstractServer(B b, int i) {
        if (null == b) {
            throw new NullPointerException("Bootstrap can not be null.");
        }
        this.serverStateRef = new AtomicReference<>(ServerState.Created);
        this.bootstrap = b;
        this.port = i;
        this.eventsSubject = new MetricEventsSubject<>();
        this.connectionFactory = UnpooledConnectionFactory.from(this.eventsSubject, ServerChannelMetricEventProvider.INSTANCE);
    }

    public void startAndWait() {
        start();
        try {
            waitTillShutdown();
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    /* JADX WARN: Type inference failed for: r1v5, types: [io.netty.channel.ChannelFuture] */
    public S start() {
        if (!this.serverStateRef.compareAndSet(ServerState.Created, ServerState.Starting)) {
            throw new IllegalStateException("Server already started");
        }
        try {
            this.bindFuture = this.bootstrap.bind(this.port).sync2();
            if (!this.bindFuture.isSuccess()) {
                throw new RuntimeException(this.bindFuture.cause());
            }
            this.serverStateRef.set(ServerState.Started);
            logger.info("Rx server started at port: " + getServerPort());
            return returnServer();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public S withErrorHandler(ErrorHandler errorHandler) {
        if (this.serverStateRef.get() == ServerState.Started) {
            throw new IllegalStateException("Error handler can not be set after starting the server.");
        }
        this.errorHandler = errorHandler;
        return returnServer();
    }

    public void shutdown() throws InterruptedException {
        if (!this.serverStateRef.compareAndSet(ServerState.Started, ServerState.Shutdown)) {
            throw new IllegalStateException("The server is already shutdown.");
        }
        this.bindFuture.channel().close().sync2();
    }

    public void waitTillShutdown() throws InterruptedException {
        switch (this.serverStateRef.get()) {
            case Created:
            case Starting:
                throw new IllegalStateException("Server not started yet.");
            case Started:
                this.bindFuture.channel().closeFuture().await2();
                return;
            case Shutdown:
            default:
                return;
        }
    }

    public void waitTillShutdown(long j, TimeUnit timeUnit) throws InterruptedException {
        switch (this.serverStateRef.get()) {
            case Created:
            case Starting:
                throw new IllegalStateException("Server not started yet.");
            case Started:
                this.bindFuture.channel().closeFuture().await(j, timeUnit);
                return;
            case Shutdown:
            default:
                return;
        }
    }

    public int getServerPort() {
        if (null != this.bindFuture && this.bindFuture.isDone()) {
            SocketAddress localAddress = this.bindFuture.channel().localAddress();
            if (localAddress instanceof InetSocketAddress) {
                return ((InetSocketAddress) localAddress).getPort();
            }
        }
        return this.port;
    }

    public MetricEventsSubject<ServerMetricsEvent<?>> getEventsSubject() {
        return this.eventsSubject;
    }

    @Override // io.reactivex.netty.metrics.MetricEventsPublisher
    public Subscription subscribe(MetricEventsListener<? extends ServerMetricsEvent<?>> metricEventsListener) {
        return this.eventsSubject.subscribe(metricEventsListener);
    }

    protected S returnServer() {
        return this;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ChannelInitializer<Channel> newChannelInitializer(final PipelineConfigurator<I, O> pipelineConfigurator, final ConnectionHandler<I, O> connectionHandler, final EventExecutorGroup eventExecutorGroup) {
        return new ChannelInitializer<Channel>() { // from class: io.reactivex.netty.server.AbstractServer.1
            @Override // io.netty.channel.ChannelInitializer
            protected void initChannel(Channel channel) throws Exception {
                PipelineConfigurator serverRequiredConfigurator = new ServerRequiredConfigurator(connectionHandler, AbstractServer.this.connectionFactory, AbstractServer.this.errorHandler, AbstractServer.this.eventsSubject, eventExecutorGroup);
                (null == pipelineConfigurator ? serverRequiredConfigurator : new PipelineConfiguratorComposite(pipelineConfigurator, serverRequiredConfigurator)).configureNewPipeline(channel.pipeline());
            }
        };
    }
}
