package io.hekate.network.netty;

import io.hekate.codec.Codec;
import io.hekate.codec.CodecException;
import io.hekate.core.internal.util.Utils;
import io.hekate.network.NetworkEndpoint;
import io.hekate.network.NetworkFuture;
import io.hekate.network.NetworkSendCallback;
import io.hekate.network.NetworkServerHandler;
import io.hekate.network.netty.NettyServer;
import io.hekate.network.netty.NetworkProtocol;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.ChannelTrafficShapingHandler;
import io.netty.handler.traffic.TrafficCounter;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.ThrowableUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/hekate/network/netty/NettyServerClient.class */
public class NettyServerClient extends ChannelInboundHandlerAdapter implements NetworkEndpoint<Object>, NettyChannelSupport {
    private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION;
    private final Map<String, NettyServer.HandlerRegistration> handlers;
    private final InetSocketAddress remoteAddress;
    private final InetSocketAddress localAddress;
    private final EventLoopGroup coreEventLoopGroup;
    private final boolean ssl;
    private final int hbInterval;
    private final int hbLossThreshold;
    private final boolean hbDisabled;
    private int ignoreReadTimeouts;
    private NetworkServerHandler<Object> serverHandler;
    private NettyServer.HandlerRegistration handlerReg;
    private NettyMetricsSink metrics;
    private String protocol;
    private Codec<Object> codec;
    private EventLoop eventLoop;
    private boolean connectNotified;
    private volatile Object userContext;
    private volatile ChannelHandlerContext handlerCtx;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final NettyWriteQueue writeQueue = new NettyWriteQueue();
    private boolean hbFlushed = true;
    private Logger log = LoggerFactory.getLogger(NettyServerClient.class);
    private boolean debug = this.log.isDebugEnabled();
    private boolean trace = this.log.isTraceEnabled();
    private final ChannelFutureListener writeListener = channelFuture -> {
        if (channelFuture.isSuccess()) {
            if (this.metrics != null) {
                this.metrics.onMessageDequeue();
                this.metrics.onMessageSent();
                return;
            }
            return;
        }
        if (channelFuture.channel().pipeline().last() != null) {
            channelFuture.channel().pipeline().fireExceptionCaught(channelFuture.cause());
        }
        if (this.metrics != null) {
            this.metrics.onMessageDequeue();
            this.metrics.onMessageSendError();
        }
    };
    private final GenericFutureListener<Future<? super Void>> hbFlushListener = future -> {
        this.hbFlushed = true;
    };

    public NettyServerClient(InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2, boolean z, int i, int i2, boolean z2, Map<String, NettyServer.HandlerRegistration> map, EventLoopGroup eventLoopGroup) {
        this.remoteAddress = inetSocketAddress;
        this.localAddress = inetSocketAddress2;
        this.ssl = z;
        this.hbInterval = i;
        this.hbLossThreshold = i2;
        this.hbDisabled = z2;
        this.handlers = map;
        this.coreEventLoopGroup = eventLoopGroup;
    }

    @Override // io.hekate.network.NetworkEndpoint
    public String protocol() {
        return this.protocol;
    }

    @Override // io.hekate.network.NetworkEndpoint
    public InetSocketAddress remoteAddress() {
        return this.remoteAddress;
    }

    @Override // io.hekate.network.NetworkEndpoint
    public InetSocketAddress localAddress() {
        return this.localAddress;
    }

    @Override // io.hekate.network.NetworkEndpoint
    public boolean isSecure() {
        return this.ssl;
    }

    @Override // io.hekate.network.NetworkEndpoint
    public <C> C getContext() {
        return (C) this.userContext;
    }

    @Override // io.hekate.network.NetworkEndpoint
    public void setContext(Object obj) {
        this.userContext = obj;
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        Channel channel = channelHandlerContext.channel();
        if (this.debug) {
            this.log.debug("Got connection [from={}]", channel.remoteAddress());
        }
        this.handlerCtx = channelHandlerContext;
        mayBeCreateIdleStateHandler().ifPresent(idleStateHandler -> {
            channelHandlerContext.pipeline().addFirst(IdleStateHandler.class.getName(), idleStateHandler);
        });
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.handlerReg != null) {
            this.handlerReg.remove(this);
        }
        if (this.serverHandler != null && this.connectNotified) {
            try {
                this.serverHandler.onDisconnect(this);
            } finally {
                this.serverHandler = null;
            }
        }
        if (this.metrics != null) {
            this.metrics.onDisconnect();
        }
        this.handlerCtx = null;
        if (this.debug) {
            this.log.debug("Closed connection [from={}]", address());
        }
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        String str;
        NettyServer.HandlerRegistration handlerRegistration;
        if (isHandshakeDone()) {
            if (obj instanceof NetworkProtocol.Heartbeat) {
                if (this.trace) {
                    this.log.trace("Received network heartbeat from client [from={}]", address());
                    return;
                }
                return;
            }
            NettyMessage nettyMessage = (NettyMessage) obj;
            nettyMessage.prepare(this.log);
            if (this.trace) {
                this.log.trace("Message buffer prepared [from={}, message={}]", address(), nettyMessage);
            }
            if (this.metrics != null) {
                this.metrics.onMessageReceived();
            }
            try {
                this.serverHandler.onMessage(nettyMessage, this);
                nettyMessage.release();
                return;
            } catch (Throwable th) {
                nettyMessage.release();
                throw th;
            }
        }
        if (this.debug) {
            this.log.debug("Received network handshake request [from={}, message={}]", address(), obj);
        }
        NetworkProtocol.HandshakeRequest handshakeRequest = (NetworkProtocol.HandshakeRequest) obj;
        if (handshakeRequest == null) {
            str = null;
            handlerRegistration = null;
        } else {
            String protocol = handshakeRequest.protocol();
            str = protocol;
            this.protocol = protocol;
            handlerRegistration = this.handlers.get(str);
        }
        if (handlerRegistration == null) {
            if (this.debug) {
                this.log.debug("Closing connection with unsupported protocol [from={}, protocol={}]", address(), str);
            }
            channelHandlerContext.writeAndFlush(new NetworkProtocol.HandshakeReject("Unsupported protocol [protocol=" + str + ']')).addListener(ChannelFutureListener.CLOSE);
            return;
        }
        EventLoop mapToThread = mapToThread(handshakeRequest.threadAffinity(), handlerRegistration);
        if (mapToThread.inEventLoop()) {
            init(channelHandlerContext.channel(), handshakeRequest, handlerRegistration);
            return;
        }
        if (this.debug) {
            this.log.debug("Registering channel to a custom NIO thread [from={}, protocol={}]", address(), str);
        }
        channelHandlerContext.pipeline().remove(IdleStateHandler.class.getName());
        Channel channel = channelHandlerContext.channel();
        String str2 = str;
        NettyServer.HandlerRegistration handlerRegistration2 = handlerRegistration;
        channel.deregister().addListener(future -> {
            if (future.isSuccess() && !mapToThread.isShutdown() && channel.isOpen()) {
                mapToThread.register(channel).addListener(future -> {
                    if (future.isSuccess() && channel.isOpen()) {
                        if (this.debug) {
                            this.log.debug("Registered channel to a custom NIO thread [from={}, protocol={}]", address(), str2);
                        }
                        mayBeCreateIdleStateHandler().ifPresent(idleStateHandler -> {
                            channelHandlerContext.pipeline().addFirst(IdleStateHandler.class.getName(), idleStateHandler);
                        });
                        init(channel, handshakeRequest, handlerRegistration2);
                    }
                });
            }
        });
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        Throwable unwrap = NettyErrorUtils.unwrap(th);
        boolean z = true;
        if (unwrap instanceof CodecException) {
            z = false;
        } else if (unwrap instanceof IOException) {
            if (this.debug) {
                this.log.debug("Closing inbound network connection due to I/O error [protocol={}, from={}, reason={}]", new Object[]{this.protocol, address(), unwrap.toString()});
            }
        } else if (this.log.isErrorEnabled()) {
            this.log.error("Inbound network connection failure [protocol={}, from={}]", new Object[]{this.protocol, address(), unwrap});
        }
        if (z) {
            if (this.serverHandler != null) {
                this.serverHandler.onFailure(this, unwrap);
            }
            channelHandlerContext.close();
        }
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj instanceof AutoReadChangeEvent) {
            if (obj == AutoReadChangeEvent.PAUSE) {
                this.ignoreReadTimeouts = -1;
                return;
            } else {
                this.ignoreReadTimeouts = 1;
                return;
            }
        }
        if (!(obj instanceof IdleStateEvent)) {
            super.userEventTriggered(channelHandlerContext, obj);
            return;
        }
        if (((IdleStateEvent) obj).state() == IdleState.WRITER_IDLE) {
            if (this.hbFlushed && isHandshakeDone()) {
                this.hbFlushed = false;
                channelHandlerContext.writeAndFlush(NetworkProtocol.Heartbeat.INSTANCE).addListener(this.hbFlushListener);
                return;
            }
            return;
        }
        if (this.ignoreReadTimeouts == -1 || !channelHandlerContext.channel().config().isAutoRead()) {
            return;
        }
        if (this.ignoreReadTimeouts <= 0) {
            throw new SocketTimeoutException();
        }
        this.ignoreReadTimeouts--;
    }

    @Override // io.hekate.network.NetworkEndpoint
    public void send(Object obj) {
        doSend(obj, null);
    }

    @Override // io.hekate.network.NetworkEndpoint
    public void send(Object obj, NetworkSendCallback<Object> networkSendCallback) {
        doSend(obj, networkSendCallback);
    }

    @Override // io.hekate.network.NetworkEndpoint
    public void pauseReceiving(Consumer<NetworkEndpoint<Object>> consumer) {
        pauseReceiver(true, consumer);
    }

    @Override // io.hekate.network.NetworkEndpoint
    public void resumeReceiving(Consumer<NetworkEndpoint<Object>> consumer) {
        pauseReceiver(false, consumer);
    }

    @Override // io.hekate.network.NetworkEndpoint
    public boolean isReceiving() {
        ChannelHandlerContext channelHandlerContext = this.handlerCtx;
        return channelHandlerContext != null && channelHandlerContext.channel().config().isAutoRead();
    }

    @Override // io.hekate.network.NetworkEndpoint
    public NetworkFuture<Object> disconnect() {
        NetworkFuture<Object> networkFuture = new NetworkFuture<>();
        ChannelHandlerContext channelHandlerContext = this.handlerCtx;
        if (channelHandlerContext == null) {
            networkFuture.complete(this);
        } else {
            this.handlerCtx = null;
            channelHandlerContext.close().addListener(future -> {
                networkFuture.complete(this);
            });
        }
        return networkFuture;
    }

    @Override // io.hekate.network.netty.NettyChannelSupport
    public Optional<Channel> nettyChannel() {
        ChannelHandlerContext channelHandlerContext = this.handlerCtx;
        return channelHandlerContext != null ? Optional.of(channelHandlerContext.channel()) : Optional.empty();
    }

    private void pauseReceiver(boolean z, Consumer<NetworkEndpoint<Object>> consumer) {
        ChannelHandlerContext channelHandlerContext = this.handlerCtx;
        if (channelHandlerContext == null) {
            if (consumer != null) {
                consumer.accept(this);
                return;
            }
            return;
        }
        if (this.debug) {
            if (z) {
                this.log.debug("Pausing inbound receiver [from={}, protocol={}]", address(), this.protocol);
            } else {
                this.log.debug("Resuming Pausing inbound receiver [from={}, protocol={}]", address(), this.protocol);
            }
        }
        Channel channel = channelHandlerContext.channel();
        EventLoop eventLoop = channel.eventLoop();
        if (!eventLoop.inEventLoop()) {
            eventLoop.execute(() -> {
                channel.config().setAutoRead(!z);
                notifyOnReceivePause(z, consumer, channel);
            });
        } else {
            channel.config().setAutoRead(!z);
            notifyOnReceivePause(z, consumer, channel);
        }
    }

    private void notifyOnReceivePause(boolean z, Consumer<NetworkEndpoint<Object>> consumer, Channel channel) {
        if (!$assertionsDisabled && !channel.eventLoop().inEventLoop()) {
            throw new AssertionError("Must be on event loop thread.");
        }
        channel.pipeline().fireUserEventTriggered(z ? AutoReadChangeEvent.PAUSE : AutoReadChangeEvent.RESUME);
        if (consumer != null) {
            try {
                consumer.accept(this);
            } catch (Error | RuntimeException e) {
                this.log.error("Got an unexpected runtime error while notifying callback on network inbound receive status change [pause={}, from={}, protocol={}]", new Object[]{Boolean.valueOf(z), address(), this.protocol, e});
            }
        }
    }

    private Optional<IdleStateHandler> mayBeCreateIdleStateHandler() {
        if (this.hbInterval <= 0 || this.hbLossThreshold <= 0) {
            return Optional.empty();
        }
        int i = this.hbInterval;
        int i2 = this.hbInterval * this.hbLossThreshold;
        if (this.hbDisabled) {
            i = 0;
            if (this.debug) {
                this.log.debug("Registering heartbeatless timeout handler [from={}, read-timeout={}]", address(), Integer.valueOf(i2));
            }
        } else if (this.debug) {
            this.log.debug("Registering heartbeat handler [from={}, interval={}, loss-threshold={}, read-timeout={}]", new Object[]{address(), Integer.valueOf(i), Integer.valueOf(this.hbLossThreshold), Integer.valueOf(i2)});
        }
        return Optional.of(new IdleStateHandler(i2, i, 0L, TimeUnit.MILLISECONDS));
    }

    private void init(Channel channel, NetworkProtocol.HandshakeRequest handshakeRequest, NettyServer.HandlerRegistration handlerRegistration) {
        NettyServerHandlerConfig<Object> config = handlerRegistration.config();
        if (config.getLoggerCategory() != null) {
            this.log = LoggerFactory.getLogger(config.getLoggerCategory());
            this.debug = this.log.isDebugEnabled();
            this.trace = this.log.isTraceEnabled();
        }
        if (this.debug) {
            this.log.debug("Initialized connection [from={}, protocol={}]", address(), config.getProtocol());
        }
        this.eventLoop = channel.eventLoop();
        this.serverHandler = config.getHandler();
        this.handlerReg = handlerRegistration;
        this.metrics = handlerRegistration.metrics();
        this.codec = handshakeRequest.codec();
        handlerRegistration.add(this);
        if (this.metrics != null) {
            channel.pipeline().addFirst(new ChannelHandler[]{new ChannelTrafficShapingHandler(0L, 0L, 1000L) { // from class: io.hekate.network.netty.NettyServerClient.1
                protected void doAccounting(TrafficCounter trafficCounter) {
                    NettyServerClient.this.metrics.onBytesReceived(trafficCounter.lastReadBytes());
                    NettyServerClient.this.metrics.onBytesSent(trafficCounter.lastWrittenBytes());
                }
            }});
            this.metrics.onConnect();
        }
        channel.writeAndFlush(new NetworkProtocol.HandshakeAccept(this.hbInterval, this.hbLossThreshold, this.hbDisabled)).addListener(future -> {
            if (channel.isOpen()) {
                this.connectNotified = true;
                this.serverHandler.onConnect(handshakeRequest.payload(), this);
            }
        });
    }

    private void doSend(Object obj, NetworkSendCallback<Object> networkSendCallback) {
        ChannelHandlerContext channelHandlerContext = this.handlerCtx;
        if (channelHandlerContext != null) {
            write(obj, networkSendCallback, channelHandlerContext);
            return;
        }
        if (this.metrics != null) {
            this.metrics.onMessageSendError();
        }
        if (networkSendCallback != null) {
            NettyUtils.runAtAllCost(this.eventLoop, () -> {
                notifyOnError(obj, networkSendCallback, WRITE_CLOSED_CHANNEL_EXCEPTION);
            });
        }
    }

    private void write(Object obj, NetworkSendCallback<Object> networkSendCallback, ChannelHandlerContext channelHandlerContext) {
        DeferredMessage fail;
        if (!this.codec.baseType().isInstance(obj)) {
            notifyOnError(obj, networkSendCallback, new CodecException("Unsupported message type [expected=" + this.codec.baseType().getName() + ", real=" + obj.getClass().getName() + ']'));
        }
        if (this.debug) {
            this.log.debug("Sending to a client [to={}, message={}]", address(), obj);
        }
        Channel channel = channelHandlerContext.channel();
        if (this.metrics != null) {
            this.metrics.onMessageEnqueue();
        }
        boolean z = false;
        if (this.codec.isStateful()) {
            fail = new DeferredMessage(obj, obj, channel);
        } else {
            if (this.trace) {
                this.log.trace("Pre-encoding message [to={}, message={}]", address(), obj);
            }
            try {
                fail = new DeferredMessage(NetworkProtocolCodec.preEncode(obj, this.codec, channelHandlerContext.alloc()), obj, channel);
            } catch (CodecException e) {
                fail = fail(obj, channel, e);
                z = true;
            }
        }
        fail.addListener(channelFuture -> {
            if (this.debug) {
                if (channelFuture.isSuccess()) {
                    this.log.debug("Done sending message to a client [to={}, message={}]", address(), obj);
                } else {
                    this.log.debug("Failed to send message to a client [to={}, message={}]", new Object[]{address(), obj, channelFuture.cause()});
                }
            }
            this.writeListener.operationComplete(channelFuture);
            if (networkSendCallback != null) {
                try {
                    networkSendCallback.onComplete(obj, Optional.ofNullable(channelFuture.cause()), this);
                } catch (Throwable th) {
                    if (this.log.isErrorEnabled()) {
                        this.log.error("Failed to notify network message callback [message={}]", obj, th);
                    }
                }
            }
        });
        if (z) {
            return;
        }
        this.writeQueue.enqueue(fail, channelHandlerContext.executor());
    }

    private EventLoop mapToThread(int i, NettyServer.HandlerRegistration handlerRegistration) {
        EventLoopGroup eventLoop = handlerRegistration.config().getEventLoop() == null ? this.coreEventLoopGroup : handlerRegistration.config().getEventLoop();
        ArrayList arrayList = new ArrayList();
        Iterator it = eventLoop.iterator();
        while (it.hasNext()) {
            arrayList.add((EventLoop) it.next());
        }
        return (EventLoop) arrayList.get(Utils.mod(i, arrayList.size()));
    }

    private DeferredMessage fail(Object obj, Channel channel, Throwable th) {
        DeferredMessage deferredMessage = new DeferredMessage(obj, obj, channel);
        deferredMessage.setFailure(th);
        return deferredMessage;
    }

    private void notifyOnError(Object obj, NetworkSendCallback<Object> networkSendCallback, Throwable th) {
        try {
            networkSendCallback.onComplete(obj, Optional.of(th), this);
        } catch (Error | RuntimeException e) {
            this.log.error("Failed to notify callback on network operation failure [protocol={}, from={}, message={}]", new Object[]{this.protocol, address(), obj, e});
        }
    }

    private boolean isHandshakeDone() {
        return this.serverHandler != null;
    }

    private InetSocketAddress address() {
        return this.remoteAddress;
    }

    public String toString() {
        return getClass().getSimpleName() + "[from=" + address() + ", protocol=" + this.protocol + ']';
    }

    static {
        $assertionsDisabled = !NettyServerClient.class.desiredAssertionStatus();
        WRITE_CLOSED_CHANNEL_EXCEPTION = (ClosedChannelException) ThrowableUtil.unknownStackTrace(new ClosedChannelException(), NettyServerClient.class, "doSend()");
    }
}
