package io.datakernel.rpc.client;

import io.datakernel.async.callback.Callback;
import io.datakernel.async.service.EventloopService;
import io.datakernel.common.Initializable;
import io.datakernel.common.MemSize;
import io.datakernel.common.Preconditions;
import io.datakernel.common.Utils;
import io.datakernel.common.exception.StacklessException;
import io.datakernel.datastream.csp.ChannelSerializer;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.eventloop.jmx.EventloopJmxMBeanEx;
import io.datakernel.eventloop.jmx.ExceptionStats;
import io.datakernel.eventloop.net.SocketSettings;
import io.datakernel.jmx.api.JmxAttribute;
import io.datakernel.jmx.api.JmxOperation;
import io.datakernel.jmx.api.JmxReducers;
import io.datakernel.net.AsyncSslSocket;
import io.datakernel.net.AsyncTcpSocketImpl;
import io.datakernel.promise.Promise;
import io.datakernel.promise.Promises;
import io.datakernel.promise.SettablePromise;
import io.datakernel.rpc.client.jmx.RpcConnectStats;
import io.datakernel.rpc.client.jmx.RpcRequestStats;
import io.datakernel.rpc.client.sender.RpcSender;
import io.datakernel.rpc.client.sender.RpcStrategy;
import io.datakernel.rpc.protocol.RpcMessage;
import io.datakernel.rpc.protocol.RpcStream;
import io.datakernel.serializer.BinarySerializer;
import io.datakernel.serializer.SerializerBuilder;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import javax.net.ssl.SSLContext;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/rpc/client/RpcClient.class */
public final class RpcClient implements IRpcClient, EventloopService, Initializable<RpcClient>, EventloopJmxMBeanEx {
    private final Eventloop eventloop;
    private SSLContext sslContext;
    private Executor sslExecutor;
    private List<Class<?>> messageTypes;
    private boolean forcedStart;
    private BinarySerializer<RpcMessage> serializer;

    @Nullable
    private SettablePromise<Void> stopPromise;
    private final RpcClientConnectionPool pool;
    private boolean monitoring;
    private final RpcRequestStats generalRequestsStats;
    private final RpcConnectStats generalConnectsStats;
    private final Map<Class<?>, RpcRequestStats> requestStatsPerClass;
    private final Map<InetSocketAddress, RpcConnectStats> connectsStatsPerAddress;
    private final ExceptionStats lastProtocolError;
    private final AsyncTcpSocketImpl.JmxInspector statsSocket;
    public static final SocketSettings DEFAULT_SOCKET_SETTINGS = SocketSettings.create().withTcpNoDelay(true);
    public static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(10);
    public static final Duration DEFAULT_RECONNECT_INTERVAL = Duration.ofSeconds(1);
    public static final MemSize DEFAULT_PACKET_SIZE = ChannelSerializer.DEFAULT_INITIAL_BUFFER_SIZE;
    public static final MemSize MAX_PACKET_SIZE = ChannelSerializer.MAX_SIZE;
    public static final StacklessException START_EXCEPTION = new StacklessException("Could not establish initial connection");
    static final Duration SMOOTHING_WINDOW = Duration.ofMinutes(1);
    private Logger logger = LoggerFactory.getLogger(getClass());
    private SocketSettings socketSettings = DEFAULT_SOCKET_SETTINGS;
    private RpcStrategy strategy = new NoServersStrategy();
    private List<InetSocketAddress> addresses = new ArrayList();
    private final Map<InetSocketAddress, RpcClientConnection> connections = new HashMap();
    private MemSize defaultPacketSize = DEFAULT_PACKET_SIZE;
    private MemSize maxPacketSize = MAX_PACKET_SIZE;
    private boolean compression = false;
    private Duration autoFlushInterval = Duration.ZERO;
    private long connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT.toMillis();
    private long reconnectIntervalMillis = DEFAULT_RECONNECT_INTERVAL.toMillis();
    private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    private SerializerBuilder serializerBuilder = SerializerBuilder.create(this.classLoader);
    private RpcSender requestSender = new NoSenderAvailable();

    /* loaded from: input_file:io/datakernel/rpc/client/RpcClient$NoSenderAvailable.class */
    private final class NoSenderAvailable implements RpcSender {
        private NoSenderAvailable() {
        }

        @Override // io.datakernel.rpc.client.sender.RpcSender
        public <I, O> void sendRequest(I i, int i2, @NotNull Callback<O> callback) {
            callback.accept((Object) null, NO_SENDER_AVAILABLE_EXCEPTION);
        }
    }

    /* loaded from: input_file:io/datakernel/rpc/client/RpcClient$NoServersStrategy.class */
    private static final class NoServersStrategy implements RpcStrategy {
        private NoServersStrategy() {
        }

        @Override // io.datakernel.rpc.client.sender.RpcStrategy
        public Set<InetSocketAddress> getAddresses() {
            return Collections.emptySet();
        }

        @Override // io.datakernel.rpc.client.sender.RpcStrategy
        public RpcSender createSender(RpcClientConnectionPool rpcClientConnectionPool) {
            return null;
        }
    }

    private RpcClient(Eventloop eventloop) {
        Map<InetSocketAddress, RpcClientConnection> map = this.connections;
        map.getClass();
        this.pool = (v1) -> {
            return r1.get(v1);
        };
        this.monitoring = false;
        this.generalRequestsStats = RpcRequestStats.create(SMOOTHING_WINDOW);
        this.generalConnectsStats = new RpcConnectStats();
        this.requestStatsPerClass = new HashMap();
        this.connectsStatsPerAddress = new HashMap();
        this.lastProtocolError = ExceptionStats.create();
        this.statsSocket = new AsyncTcpSocketImpl.JmxInspector();
        this.eventloop = eventloop;
    }

    public static RpcClient create(Eventloop eventloop) {
        return new RpcClient(eventloop);
    }

    public RpcClient withClassLoader(ClassLoader classLoader) {
        this.classLoader = classLoader;
        this.serializerBuilder = SerializerBuilder.create(classLoader);
        return this;
    }

    public RpcClient withSocketSettings(SocketSettings socketSettings) {
        this.socketSettings = socketSettings;
        return this;
    }

    public RpcClient withMessageTypes(@NotNull Class<?>... clsArr) {
        return withMessageTypes(Arrays.asList(clsArr));
    }

    public RpcClient withMessageTypes(List<Class<?>> list) {
        Preconditions.checkArgument(new HashSet(list).size() == list.size(), "Message types must be unique");
        this.messageTypes = list;
        return this;
    }

    public RpcClient withSerializerBuilder(SerializerBuilder serializerBuilder) {
        this.serializerBuilder = serializerBuilder;
        return this;
    }

    public RpcClient withStrategy(RpcStrategy rpcStrategy) {
        this.strategy = rpcStrategy;
        this.addresses = new ArrayList(this.strategy.getAddresses());
        for (InetSocketAddress inetSocketAddress : this.addresses) {
            if (!this.connectsStatsPerAddress.containsKey(inetSocketAddress)) {
                this.connectsStatsPerAddress.put(inetSocketAddress, new RpcConnectStats());
            }
        }
        return this;
    }

    public RpcClient withStreamProtocol(MemSize memSize, MemSize memSize2, boolean z) {
        this.defaultPacketSize = memSize;
        this.maxPacketSize = memSize2;
        this.compression = z;
        return this;
    }

    public RpcClient withAutoFlushInterval(Duration duration) {
        this.autoFlushInterval = duration;
        return this;
    }

    public RpcClient withConnectTimeout(Duration duration) {
        this.connectTimeoutMillis = duration.toMillis();
        return this;
    }

    public RpcClient withReconnectInterval(Duration duration) {
        this.reconnectIntervalMillis = duration.toMillis();
        return this;
    }

    public RpcClient withSslEnabled(SSLContext sSLContext, Executor executor) {
        this.sslContext = sSLContext;
        this.sslExecutor = executor;
        return this;
    }

    public RpcClient withLogger(Logger logger) {
        this.logger = logger;
        return this;
    }

    @Deprecated
    public RpcClient withImmediateStart() {
        return this;
    }

    public RpcClient withForcedStart() {
        this.forcedStart = true;
        return this;
    }

    public SocketSettings getSocketSettings() {
        return this.socketSettings;
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @NotNull
    public Promise<Void> start() {
        Preconditions.checkState(this.eventloop.inEventloopThread(), "Not in eventloop thread");
        Preconditions.checkNotNull(this.messageTypes, "Message types must be specified");
        Preconditions.checkState(this.stopPromise == null);
        this.serializer = this.serializerBuilder.withSubclasses(RpcMessage.MESSAGE_TYPES, this.messageTypes).build(RpcMessage.class);
        return Promises.all(this.addresses.stream().map(inetSocketAddress -> {
            this.logger.info("Connecting: {}", inetSocketAddress);
            return connect(inetSocketAddress).thenEx((r2, th) -> {
                return Promise.complete();
            });
        })).then(r3 -> {
            return (this.forcedStart || !(this.requestSender instanceof NoSenderAvailable)) ? Promise.complete() : Promise.ofException(START_EXCEPTION);
        });
    }

    @NotNull
    public Promise<Void> stop() {
        Preconditions.checkState(this.eventloop.inEventloopThread(), "Not in eventloop thread");
        if (this.stopPromise != null) {
            return this.stopPromise;
        }
        this.stopPromise = new SettablePromise<>();
        if (this.connections.size() == 0) {
            this.stopPromise.set((Object) null);
            return this.stopPromise;
        }
        Iterator<RpcClientConnection> it = this.connections.values().iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        return this.stopPromise;
    }

    private Promise<Void> connect(InetSocketAddress inetSocketAddress) {
        return AsyncTcpSocketImpl.connect(inetSocketAddress, this.connectTimeoutMillis, this.socketSettings).whenResult(asyncTcpSocketImpl -> {
            if (this.stopPromise != null) {
                asyncTcpSocketImpl.close();
                return;
            }
            asyncTcpSocketImpl.withInspector(this.statsSocket);
            RpcStream rpcStream = new RpcStream(this.sslContext == null ? asyncTcpSocketImpl : AsyncSslSocket.wrapClientSocket(asyncTcpSocketImpl, this.sslContext, this.sslExecutor), this.serializer, this.defaultPacketSize, this.maxPacketSize, this.autoFlushInterval, this.compression, false);
            RpcClientConnection rpcClientConnection = new RpcClientConnection(this.eventloop, this, inetSocketAddress, rpcStream);
            rpcStream.setListener(rpcClientConnection);
            if (isMonitoring()) {
                rpcClientConnection.startMonitoring();
            }
            this.connections.put(inetSocketAddress, rpcClientConnection);
            this.requestSender = (RpcSender) Utils.nullToSupplier(this.strategy.createSender(this.pool), () -> {
                return new NoSenderAvailable();
            });
            this.generalConnectsStats.successfulConnects++;
            this.connectsStatsPerAddress.get(inetSocketAddress).successfulConnects++;
            this.logger.info("Connection to {} established", inetSocketAddress);
        }).whenException(th -> {
            this.logger.warn("Connection {} failed: {}", inetSocketAddress, th);
            if (this.stopPromise == null) {
                processClosedConnection(inetSocketAddress);
            }
        }).toVoid();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeConnection(InetSocketAddress inetSocketAddress) {
        if (this.connections.remove(inetSocketAddress) == null) {
            return;
        }
        this.requestSender = (RpcSender) Utils.nullToSupplier(this.strategy.createSender(this.pool), () -> {
            return new NoSenderAvailable();
        });
        this.logger.info("Connection closed: {}", inetSocketAddress);
        processClosedConnection(inetSocketAddress);
    }

    private void processClosedConnection(InetSocketAddress inetSocketAddress) {
        this.generalConnectsStats.failedConnects++;
        this.connectsStatsPerAddress.get(inetSocketAddress).failedConnects++;
        if (this.stopPromise == null) {
            this.eventloop.delayBackground(this.reconnectIntervalMillis, () -> {
                if (this.stopPromise == null) {
                    this.logger.info("Reconnecting: {}", inetSocketAddress);
                    connect(inetSocketAddress);
                }
            });
        } else if (this.connections.size() == 0) {
            this.stopPromise.set((Object) null);
        }
    }

    @Override // io.datakernel.rpc.client.IRpcClient
    public <I, O> void sendRequest(I i, int i2, Callback<O> callback) {
        if (i2 > 0) {
            this.requestSender.sendRequest(i, i2, callback);
        } else {
            callback.accept((Object) null, RPC_TIMEOUT_EXCEPTION);
        }
    }

    @Override // io.datakernel.rpc.client.IRpcClient
    public <I, O> void sendRequest(I i, Callback<O> callback) {
        this.requestSender.sendRequest(i, callback);
    }

    public IRpcClient adaptToAnotherEventloop(final Eventloop eventloop) {
        return eventloop == this.eventloop ? this : new IRpcClient() { // from class: io.datakernel.rpc.client.RpcClient.1
            @Override // io.datakernel.rpc.client.IRpcClient
            public <I, O> void sendRequest(I i, int i2, Callback<O> callback) {
                if (i2 <= 0) {
                    callback.accept((Object) null, RPC_TIMEOUT_EXCEPTION);
                    return;
                }
                Eventloop eventloop2 = RpcClient.this.eventloop;
                Eventloop eventloop3 = eventloop;
                eventloop2.execute(() -> {
                    RpcClient.this.requestSender.sendRequest(i, i2, Callback.toAnotherEventloop(eventloop3, callback));
                });
            }
        };
    }

    public RpcSender getRequestSender() {
        return this.requestSender;
    }

    public String toString() {
        return "RpcClient{" + this.connections + '}';
    }

    @JmxOperation(description = "enable monitoring [ when monitoring is enabled more stats are collected, but it causes more overhead (for example, responseTime and requestsStatsPerClass are collected only when monitoring is enabled) ]")
    public void startMonitoring() {
        this.monitoring = true;
        Iterator<InetSocketAddress> it = this.addresses.iterator();
        while (it.hasNext()) {
            RpcClientConnection rpcClientConnection = this.connections.get(it.next());
            if (rpcClientConnection != null) {
                rpcClientConnection.startMonitoring();
            }
        }
    }

    @JmxOperation(description = "disable monitoring [ when monitoring is enabled more stats are collected, but it causes more overhead (for example, responseTime and requestsStatsPerClass are collected only when monitoring is enabled) ]")
    public void stopMonitoring() {
        this.monitoring = false;
        Iterator<InetSocketAddress> it = this.addresses.iterator();
        while (it.hasNext()) {
            RpcClientConnection rpcClientConnection = this.connections.get(it.next());
            if (rpcClientConnection != null) {
                rpcClientConnection.stopMonitoring();
            }
        }
    }

    @JmxAttribute(description = "when monitoring is enabled more stats are collected, but it causes more overhead (for example, responseTime and requestsStatsPerClass are collected only when monitoring is enabled)")
    private boolean isMonitoring() {
        return this.monitoring;
    }

    @JmxAttribute(name = "requests", extraSubAttributes = {"totalRequests"})
    public RpcRequestStats getGeneralRequestsStats() {
        return this.generalRequestsStats;
    }

    @JmxAttribute(name = "connects")
    public RpcConnectStats getGeneralConnectsStats() {
        return this.generalConnectsStats;
    }

    @JmxAttribute(description = "request stats distributed by request class")
    public Map<Class<?>, RpcRequestStats> getRequestsStatsPerClass() {
        return this.requestStatsPerClass;
    }

    @JmxAttribute
    public Map<InetSocketAddress, RpcConnectStats> getConnectsStatsPerAddress() {
        return this.connectsStatsPerAddress;
    }

    @JmxAttribute(description = "request stats for current connections (when connection is closed stats are removed)")
    public Map<InetSocketAddress, RpcClientConnection> getRequestStatsPerConnection() {
        return this.connections;
    }

    @JmxAttribute(reducer = JmxReducers.JmxReducerSum.class)
    public int getActiveConnections() {
        return this.connections.size();
    }

    @JmxAttribute(reducer = JmxReducers.JmxReducerSum.class)
    public int getActiveRequests() {
        int i = 0;
        Iterator<RpcClientConnection> it = this.connections.values().iterator();
        while (it.hasNext()) {
            i += it.next().getActiveRequests();
        }
        return i;
    }

    @JmxAttribute(description = "exception that occurred because of protocol error (serialization, deserialization, compression, decompression, etc)")
    public ExceptionStats getLastProtocolError() {
        return this.lastProtocolError;
    }

    @JmxAttribute
    public AsyncTcpSocketImpl.JmxInspector getStatsSocket() {
        return this.statsSocket;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RpcRequestStats ensureRequestStatsPerClass(Class<?> cls) {
        if (!this.requestStatsPerClass.containsKey(cls)) {
            this.requestStatsPerClass.put(cls, RpcRequestStats.create(SMOOTHING_WINDOW));
        }
        return this.requestStatsPerClass.get(cls);
    }
}
