package io.activej.rpc.server;

import io.activej.codegen.DefiningClassLoader;
import io.activej.common.Checks;
import io.activej.common.MemSize;
import io.activej.common.exception.MalformedDataException;
import io.activej.csp.process.frames.FrameFormat;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.net.ServerSocketSettings;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.api.attribute.JmxReducers;
import io.activej.jmx.stats.EventStats;
import io.activej.jmx.stats.ExceptionStats;
import io.activej.jmx.stats.ValueStats;
import io.activej.net.AbstractServer;
import io.activej.net.socket.tcp.AsyncTcpSocket;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import io.activej.rpc.protocol.RpcControlMessage;
import io.activej.rpc.protocol.RpcMessage;
import io.activej.rpc.protocol.RpcStream;
import io.activej.serializer.BinarySerializer;
import io.activej.serializer.SerializerBuilder;
import java.net.InetAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/rpc/server/RpcServer.class */
public final class RpcServer extends AbstractServer<RpcServer> {
    private MemSize initialBufferSize;

    @Nullable
    private FrameFormat frameFormat;
    private Duration autoFlushInterval;
    private final Map<Class<?>, RpcRequestHandler<?, ?>> handlers;
    private ClassLoader classLoader;
    private SerializerBuilder serializerBuilder;
    private List<Class<?>> messageTypes;
    private final List<RpcServerConnection> connections;
    private BinarySerializer<RpcMessage> serializer;
    private SettablePromise<Void> closeCallback;
    private final EventStats totalConnects;
    private final Map<InetAddress, EventStats> connectsPerAddress;
    private final EventStats successfulRequests;
    private final EventStats failedRequests;
    private final ValueStats requestHandlingTime;
    private final ExceptionStats lastRequestHandlingException;
    private final ExceptionStats lastProtocolError;
    private boolean monitoring;
    public static final ServerSocketSettings DEFAULT_SERVER_SOCKET_SETTINGS = ServerSocketSettings.create(16384);
    public static final MemSize DEFAULT_INITIAL_BUFFER_SIZE = ChannelSerializer.DEFAULT_INITIAL_BUFFER_SIZE;
    static final Duration SMOOTHING_WINDOW = Duration.ofMinutes(1);

    private RpcServer(Eventloop eventloop) {
        super(eventloop);
        this.initialBufferSize = DEFAULT_INITIAL_BUFFER_SIZE;
        this.autoFlushInterval = Duration.ZERO;
        this.handlers = new LinkedHashMap();
        this.classLoader = Thread.currentThread().getContextClassLoader();
        this.serializerBuilder = SerializerBuilder.create(DefiningClassLoader.create(this.classLoader));
        this.connections = new ArrayList();
        this.totalConnects = EventStats.create(SMOOTHING_WINDOW);
        this.connectsPerAddress = new HashMap();
        this.successfulRequests = EventStats.create(SMOOTHING_WINDOW);
        this.failedRequests = EventStats.create(SMOOTHING_WINDOW);
        this.requestHandlingTime = ValueStats.create(SMOOTHING_WINDOW).withUnit("milliseconds");
        this.lastRequestHandlingException = ExceptionStats.create();
        this.lastProtocolError = ExceptionStats.create();
    }

    public static RpcServer create(Eventloop eventloop) {
        return ((RpcServer) ((RpcServer) new RpcServer(eventloop).withServerSocketSettings(DEFAULT_SERVER_SOCKET_SETTINGS)).withSocketSettings(DEFAULT_SOCKET_SETTINGS)).withHandler(RpcControlMessage.class, rpcControlMessage -> {
            return rpcControlMessage == RpcControlMessage.PING ? Promise.of(RpcControlMessage.PONG) : Promise.ofException(new MalformedDataException("Unknown message: " + rpcControlMessage));
        });
    }

    public RpcServer withClassLoader(ClassLoader classLoader) {
        this.classLoader = classLoader;
        this.serializerBuilder = SerializerBuilder.create(DefiningClassLoader.create(classLoader));
        return this;
    }

    public RpcServer withMessageTypes(Class<?>... clsArr) {
        return withMessageTypes(Arrays.asList(clsArr));
    }

    public RpcServer withMessageTypes(@NotNull List<Class<?>> list) {
        Checks.checkArgument(new HashSet(list).size() == list.size(), "Message types must be unique");
        this.messageTypes = list;
        return this;
    }

    public RpcServer withSerializerBuilder(SerializerBuilder serializerBuilder) {
        this.serializerBuilder = serializerBuilder;
        return this;
    }

    public RpcServer withStreamProtocol(MemSize memSize) {
        this.initialBufferSize = memSize;
        return this;
    }

    public RpcServer withStreamProtocol(MemSize memSize, FrameFormat frameFormat) {
        this.initialBufferSize = memSize;
        this.frameFormat = frameFormat;
        return this;
    }

    public RpcServer withAutoFlushInterval(Duration duration) {
        this.autoFlushInterval = duration;
        return this;
    }

    public <I, O> RpcServer withHandler(Class<I> cls, RpcRequestHandler<I, O> rpcRequestHandler) {
        Checks.checkArgument(!this.handlers.containsKey(cls), "Handler for {} has already been added", new Object[]{cls});
        this.handlers.put(cls, rpcRequestHandler);
        return this;
    }

    protected void serve(AsyncTcpSocket asyncTcpSocket, InetAddress inetAddress) {
        RpcStream rpcStream = new RpcStream(asyncTcpSocket, this.serializer, this.initialBufferSize, this.autoFlushInterval, this.frameFormat, true);
        RpcServerConnection rpcServerConnection = new RpcServerConnection(this, inetAddress, this.handlers, rpcStream);
        rpcStream.setListener(rpcServerConnection);
        add(rpcServerConnection);
        ensureConnectStats(inetAddress).recordEvent();
        this.totalConnects.recordEvent();
    }

    protected void onListen() {
        Checks.checkState(this.messageTypes != null, "Message types must be specified");
        this.serializer = this.serializerBuilder.withSubclasses(RpcMessage.MESSAGE_TYPES, this.messageTypes).build(RpcMessage.class);
    }

    protected void onClose(SettablePromise<Void> settablePromise) {
        if (this.connections.isEmpty()) {
            this.logger.info("RpcServer is closing. Active connections count: 0.");
            settablePromise.set((Object) null);
            return;
        }
        this.logger.info("RpcServer is closing. Active connections count: {}", Integer.valueOf(this.connections.size()));
        Iterator it = new ArrayList(this.connections).iterator();
        while (it.hasNext()) {
            ((RpcServerConnection) it.next()).shutdown();
        }
        this.closeCallback = settablePromise;
    }

    void add(RpcServerConnection rpcServerConnection) {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Client connected on {}", rpcServerConnection);
        }
        if (this.monitoring) {
            rpcServerConnection.startMonitoring();
        }
        this.connections.add(rpcServerConnection);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean remove(RpcServerConnection rpcServerConnection) {
        if (!this.connections.remove(rpcServerConnection)) {
            return false;
        }
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Client disconnected on {}", rpcServerConnection);
        }
        if (this.closeCallback == null) {
            return true;
        }
        this.logger.info("RpcServer is closing. One more connection was closed. Active connections count: {}", Integer.valueOf(this.connections.size()));
        if (!this.connections.isEmpty()) {
            return true;
        }
        this.closeCallback.set((Object) null);
        return true;
    }

    @JmxOperation(description = "enable monitoring [ when monitoring is enabled more stats are collected, but it causes more overhead (for example, requestHandlingTime stats are collected only when monitoring is enabled) ]")
    public void startMonitoring() {
        this.monitoring = true;
        Iterator<RpcServerConnection> it = this.connections.iterator();
        while (it.hasNext()) {
            it.next().startMonitoring();
        }
    }

    @JmxOperation(description = "disable monitoring [ when monitoring is enabled more stats are collected, but it causes more overhead (for example, requestHandlingTime stats are collected only when monitoring is enabled) ]")
    public void stopMonitoring() {
        this.monitoring = false;
        Iterator<RpcServerConnection> it = this.connections.iterator();
        while (it.hasNext()) {
            it.next().stopMonitoring();
        }
    }

    @JmxAttribute(description = "when monitoring is enabled more stats are collected, but it causes more overhead (for example, requestHandlingTime stats are collected only when monitoring is enabled)")
    public boolean isMonitoring() {
        return this.monitoring;
    }

    @JmxAttribute(description = "current number of connections", reducer = JmxReducers.JmxReducerSum.class)
    public int getConnectionsCount() {
        return this.connections.size();
    }

    @JmxAttribute
    public EventStats getTotalConnects() {
        return this.totalConnects;
    }

    public Map<InetAddress, EventStats> getConnectsPerAddress() {
        return this.connectsPerAddress;
    }

    private EventStats ensureConnectStats(InetAddress inetAddress) {
        return this.connectsPerAddress.computeIfAbsent(inetAddress, inetAddress2 -> {
            return EventStats.create(SMOOTHING_WINDOW);
        });
    }

    @JmxOperation(description = "detailed information about connections")
    public List<RpcServerConnection> getConnections() {
        return this.connections;
    }

    @JmxAttribute(extraSubAttributes = {"totalCount"}, description = "number of requests which were processed correctly")
    public EventStats getSuccessfulRequests() {
        return this.successfulRequests;
    }

    @JmxAttribute(extraSubAttributes = {"totalCount"}, description = "request with error responses (number of requests which were handled with error)")
    public EventStats getFailedRequests() {
        return this.failedRequests;
    }

    @JmxAttribute(description = "time for handling one request in milliseconds (both successful and failed)")
    public ValueStats getRequestHandlingTime() {
        return this.requestHandlingTime;
    }

    @JmxAttribute(description = "exception that occurred because of business logic error (in RpcRequestHandler implementation)")
    public ExceptionStats getLastRequestHandlingException() {
        return this.lastRequestHandlingException;
    }

    @JmxAttribute(description = "exception that occurred because of protocol error (serialization, deserialization, compression, decompression, etc)")
    public ExceptionStats getLastProtocolError() {
        return this.lastProtocolError;
    }
}
