package org.onosproject.store.cluster.messaging.impl;

import com.google.common.base.Strings;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslHandler;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManagerFactory;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.core.HybridLogicalClockService;
import org.onosproject.security.AppGuard;
import org.onosproject.security.AppPermission;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingException;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.store.cluster.messaging.impl.InternalMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
@Component(immediate = true)
/* loaded from: input_file:org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.class */
public class NettyMessagingManager implements MessagingService {
    private static final int REPLY_TIME_OUT_MILLIS = 250;
    private static final short MIN_KS_LENGTH = 6;
    private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected HybridLogicalClockService clockService;
    private Endpoint localEp;
    private int preamble;
    private EventLoopGroup serverGroup;
    private EventLoopGroup clientGroup;
    private Class<? extends ServerChannel> serverChannelClass;
    private Class<? extends Channel> clientChannelClass;
    protected static final boolean TLS_DISABLED = false;
    protected String ksLocation;
    protected String tsLocation;
    protected char[] ksPwd;
    protected char[] tsPwd;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterMetadataService clusterMetadataService;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap();
    private final AtomicLong messageIdGenerator = new AtomicLong(0);
    private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder().expireAfterWrite(250, TimeUnit.MILLISECONDS).removalListener(new RemovalListener<Long, Callback>() { // from class: org.onosproject.store.cluster.messaging.impl.NettyMessagingManager.1
        public void onRemoval(RemovalNotification<Long, Callback> removalNotification) {
            if (removalNotification.wasEvicted()) {
                ((Callback) removalNotification.getValue()).completeExceptionally(new TimeoutException("Timedout waiting for reply"));
            }
        }
    }).build();
    private final GenericKeyedObjectPool<Endpoint, Connection> channels = new GenericKeyedObjectPool<>(new OnosCommunicationChannelFactory());
    protected boolean enableNettyTls = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/onosproject/store/cluster/messaging/impl/NettyMessagingManager$Callback.class */
    public final class Callback {
        private final CompletableFuture<byte[]> future;
        private final Executor executor;

        public Callback(CompletableFuture<byte[]> completableFuture, Executor executor) {
            this.future = completableFuture;
            this.executor = executor;
        }

        public void complete(byte[] bArr) {
            this.executor.execute(() -> {
                this.future.complete(bArr);
            });
        }

        public void completeExceptionally(Throwable th) {
            this.executor.execute(() -> {
                this.future.completeExceptionally(th);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/onosproject/store/cluster/messaging/impl/NettyMessagingManager$Connection.class */
    public final class Connection {
        private final CompletableFuture<Channel> internalFuture;

        public Connection(CompletableFuture<Channel> completableFuture) {
            this.internalFuture = completableFuture;
        }

        public void send(Object obj, CompletableFuture<Void> completableFuture) {
            this.internalFuture.whenComplete((channel, th) -> {
                if (th == null) {
                    channel.writeAndFlush(obj).addListener(future -> {
                        if (future.isSuccess()) {
                            completableFuture.complete(null);
                        } else {
                            completableFuture.completeExceptionally(future.cause());
                        }
                    });
                } else {
                    completableFuture.completeExceptionally(th);
                }
            });
        }

        public void destroy() {
            Channel now = this.internalFuture.getNow(null);
            if (now != null) {
                now.close();
            }
            this.internalFuture.cancel(false);
        }

        public boolean validate() {
            if (this.internalFuture.isCompletedExceptionally()) {
                return false;
            }
            Channel now = this.internalFuture.getNow(null);
            return now == null || now.isActive();
        }
    }

    @ChannelHandler.Sharable
    /* loaded from: input_file:org/onosproject/store/cluster/messaging/impl/NettyMessagingManager$InboundMessageDispatcher.class */
    private class InboundMessageDispatcher extends SimpleChannelInboundHandler<Object> {
        private InboundMessageDispatcher() {
        }

        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
            try {
                NettyMessagingManager.this.dispatchLocally((InternalMessage) obj);
            } catch (RejectedExecutionException e) {
                NettyMessagingManager.this.log.warn("Unable to dispatch message due to {}", e.getMessage());
            }
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            NettyMessagingManager.this.log.error("Exception inside channel handling pipeline.", th);
            channelHandlerContext.close();
        }

        public final boolean acceptInboundMessage(Object obj) {
            return obj instanceof InternalMessage;
        }
    }

    /* loaded from: input_file:org/onosproject/store/cluster/messaging/impl/NettyMessagingManager$OnosCommunicationChannelFactory.class */
    private class OnosCommunicationChannelFactory implements KeyedPoolableObjectFactory<Endpoint, Connection> {
        private OnosCommunicationChannelFactory() {
        }

        public void activateObject(Endpoint endpoint, Connection connection) throws Exception {
        }

        public void destroyObject(Endpoint endpoint, Connection connection) throws Exception {
            NettyMessagingManager.this.log.debug("Closing connection to {}", endpoint);
            connection.destroy();
        }

        public Connection makeObject(Endpoint endpoint) throws Exception {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 655360);
            bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 327680);
            bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
            bootstrap.group(NettyMessagingManager.this.clientGroup);
            bootstrap.channel(NettyMessagingManager.this.clientChannelClass);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            if (NettyMessagingManager.this.enableNettyTls) {
                bootstrap.handler(new SslClientCommunicationChannelInitializer());
            } else {
                bootstrap.handler(new OnosCommunicationChannelInitializer());
            }
            CompletableFuture completableFuture = new CompletableFuture();
            ChannelFuture connect = bootstrap.connect(endpoint.host().toInetAddress(), endpoint.port());
            connect.addListener(future -> {
                if (future.isSuccess()) {
                    completableFuture.complete(connect.channel());
                } else {
                    completableFuture.completeExceptionally(future.cause());
                }
            });
            NettyMessagingManager.this.log.debug("Established a new connection to {}", endpoint);
            return new Connection(completableFuture);
        }

        public void passivateObject(Endpoint endpoint, Connection connection) throws Exception {
        }

        public boolean validateObject(Endpoint endpoint, Connection connection) {
            return connection.validate();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/onosproject/store/cluster/messaging/impl/NettyMessagingManager$OnosCommunicationChannelInitializer.class */
    public class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
        private final ChannelHandler dispatcher;
        private final ChannelHandler encoder;

        private OnosCommunicationChannelInitializer() {
            this.dispatcher = new InboundMessageDispatcher();
            this.encoder = new MessageEncoder(NettyMessagingManager.this.preamble);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast("encoder", this.encoder).addLast("decoder", new MessageDecoder()).addLast("handler", this.dispatcher);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/onosproject/store/cluster/messaging/impl/NettyMessagingManager$SslClientCommunicationChannelInitializer.class */
    public class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
        private final ChannelHandler dispatcher;
        private final ChannelHandler encoder;

        private SslClientCommunicationChannelInitializer() {
            this.dispatcher = new InboundMessageDispatcher();
            this.encoder = new MessageEncoder(NettyMessagingManager.this.preamble);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void initChannel(SocketChannel socketChannel) throws Exception {
            TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            KeyStore keyStore = KeyStore.getInstance("JKS");
            keyStore.load(new FileInputStream(NettyMessagingManager.this.tsLocation), NettyMessagingManager.this.tsPwd);
            trustManagerFactory.init(keyStore);
            KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
            KeyStore keyStore2 = KeyStore.getInstance("JKS");
            keyStore2.load(new FileInputStream(NettyMessagingManager.this.ksLocation), NettyMessagingManager.this.ksPwd);
            keyManagerFactory.init(keyStore2, NettyMessagingManager.this.ksPwd);
            SSLContext sSLContext = SSLContext.getInstance("TLS");
            sSLContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
            SSLEngine createSSLEngine = sSLContext.createSSLEngine();
            createSSLEngine.setUseClientMode(true);
            createSSLEngine.setEnabledProtocols(createSSLEngine.getSupportedProtocols());
            createSSLEngine.setEnabledCipherSuites(createSSLEngine.getSupportedCipherSuites());
            createSSLEngine.setEnableSessionCreation(true);
            socketChannel.pipeline().addLast("ssl", new SslHandler(createSSLEngine)).addLast("encoder", this.encoder).addLast("decoder", new MessageDecoder()).addLast("handler", this.dispatcher);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/onosproject/store/cluster/messaging/impl/NettyMessagingManager$SslServerCommunicationChannelInitializer.class */
    public class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
        private final ChannelHandler dispatcher;
        private final ChannelHandler encoder;

        private SslServerCommunicationChannelInitializer() {
            this.dispatcher = new InboundMessageDispatcher();
            this.encoder = new MessageEncoder(NettyMessagingManager.this.preamble);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void initChannel(SocketChannel socketChannel) throws Exception {
            TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            KeyStore keyStore = KeyStore.getInstance("JKS");
            keyStore.load(new FileInputStream(NettyMessagingManager.this.tsLocation), NettyMessagingManager.this.tsPwd);
            trustManagerFactory.init(keyStore);
            KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
            KeyStore keyStore2 = KeyStore.getInstance("JKS");
            keyStore2.load(new FileInputStream(NettyMessagingManager.this.ksLocation), NettyMessagingManager.this.ksPwd);
            keyManagerFactory.init(keyStore2, NettyMessagingManager.this.ksPwd);
            SSLContext sSLContext = SSLContext.getInstance("TLS");
            sSLContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
            SSLEngine createSSLEngine = sSLContext.createSSLEngine();
            createSSLEngine.setNeedClientAuth(true);
            createSSLEngine.setUseClientMode(false);
            createSSLEngine.setEnabledProtocols(createSSLEngine.getSupportedProtocols());
            createSSLEngine.setEnabledCipherSuites(createSSLEngine.getSupportedCipherSuites());
            createSSLEngine.setEnableSessionCreation(true);
            socketChannel.pipeline().addLast("ssl", new SslHandler(createSSLEngine)).addLast("encoder", this.encoder).addLast("decoder", new MessageDecoder()).addLast("handler", this.dispatcher);
        }
    }

    @Activate
    public void activate() throws Exception {
        ControllerNode localNode = this.clusterMetadataService.getLocalNode();
        getTlsParameters();
        if (this.started.get()) {
            this.log.warn("Already running at local endpoint: {}", this.localEp);
            return;
        }
        this.preamble = this.clusterMetadataService.getClusterMetadata().getName().hashCode();
        this.localEp = new Endpoint(localNode.ip(), localNode.tcpPort());
        this.channels.setLifo(true);
        this.channels.setTestOnBorrow(true);
        this.channels.setTestOnReturn(true);
        this.channels.setMinEvictableIdleTimeMillis(60000L);
        this.channels.setTimeBetweenEvictionRunsMillis(30000L);
        initEventLoopGroup();
        startAcceptingConnections();
        this.started.set(true);
        EventLoopGroup eventLoopGroup = this.serverGroup;
        Cache<Long, Callback> cache = this.callbacks;
        cache.getClass();
        eventLoopGroup.scheduleWithFixedDelay(cache::cleanUp, 0L, 250L, TimeUnit.MILLISECONDS);
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() throws Exception {
        if (this.started.get()) {
            this.channels.close();
            this.serverGroup.shutdownGracefully();
            this.clientGroup.shutdownGracefully();
            this.started.set(false);
        }
        this.log.info("Stopped");
    }

    private void getTlsParameters() {
        String property = System.getProperty("enableNettyTLS");
        this.enableNettyTls = Strings.isNullOrEmpty(property) ? false : Boolean.parseBoolean(property);
        this.log.info("enableNettyTLS = {}", Boolean.valueOf(this.enableNettyTls));
        if (this.enableNettyTls) {
            this.ksLocation = System.getProperty("javax.net.ssl.keyStore");
            if (Strings.isNullOrEmpty(this.ksLocation)) {
                this.enableNettyTls = false;
                return;
            }
            this.tsLocation = System.getProperty("javax.net.ssl.trustStore");
            if (Strings.isNullOrEmpty(this.tsLocation)) {
                this.enableNettyTls = false;
                return;
            }
            this.ksPwd = System.getProperty("javax.net.ssl.keyStorePassword").toCharArray();
            if (MIN_KS_LENGTH > this.ksPwd.length) {
                this.enableNettyTls = false;
                return;
            }
            this.tsPwd = System.getProperty("javax.net.ssl.trustStorePassword").toCharArray();
            if (MIN_KS_LENGTH > this.tsPwd.length) {
                this.enableNettyTls = false;
            }
        }
    }

    private void initEventLoopGroup() {
        try {
            this.clientGroup = new EpollEventLoopGroup(TLS_DISABLED, Tools.groupedThreads("NettyMessagingEvt", "epollC-%d", this.log));
            this.serverGroup = new EpollEventLoopGroup(TLS_DISABLED, Tools.groupedThreads("NettyMessagingEvt", "epollS-%d", this.log));
            this.serverChannelClass = EpollServerSocketChannel.class;
            this.clientChannelClass = EpollSocketChannel.class;
        } catch (Throwable th) {
            this.log.debug("Failed to initialize native (epoll) transport. Reason: {}. Proceeding with nio.", th.getMessage());
            this.clientGroup = new NioEventLoopGroup(TLS_DISABLED, Tools.groupedThreads("NettyMessagingEvt", "nioC-%d", this.log));
            this.serverGroup = new NioEventLoopGroup(TLS_DISABLED, Tools.groupedThreads("NettyMessagingEvt", "nioS-%d", this.log));
            this.serverChannelClass = NioServerSocketChannel.class;
            this.clientChannelClass = NioSocketChannel.class;
        }
    }

    public CompletableFuture<Void> sendAsync(Endpoint endpoint, String str, byte[] bArr) {
        AppGuard.checkPermission(AppPermission.Type.CLUSTER_WRITE);
        return sendAsync(endpoint, new InternalMessage(this.preamble, this.clockService.timeNow(), this.messageIdGenerator.incrementAndGet(), this.localEp, str, bArr));
    }

    protected CompletableFuture<Void> sendAsync(Endpoint endpoint, InternalMessage internalMessage) {
        AppGuard.checkPermission(AppPermission.Type.CLUSTER_WRITE);
        if (endpoint.equals(this.localEp)) {
            try {
                dispatchLocally(internalMessage);
                return CompletableFuture.completedFuture(null);
            } catch (IOException e) {
                return Tools.exceptionalFuture(e);
            }
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        Connection connection = TLS_DISABLED;
        try {
            try {
                connection = (Connection) this.channels.borrowObject(endpoint);
                connection.send(internalMessage, completableFuture);
                if (connection != null) {
                    this.channels.returnObject(endpoint, connection);
                }
            } catch (Throwable th) {
                if (connection != null) {
                    this.channels.returnObject(endpoint, connection);
                }
                throw th;
            }
        } catch (Exception e2) {
            completableFuture.completeExceptionally(e2);
        }
        return completableFuture;
    }

    public CompletableFuture<byte[]> sendAndReceive(Endpoint endpoint, String str, byte[] bArr) {
        AppGuard.checkPermission(AppPermission.Type.CLUSTER_WRITE);
        return sendAndReceive(endpoint, str, bArr, MoreExecutors.directExecutor());
    }

    public CompletableFuture<byte[]> sendAndReceive(Endpoint endpoint, String str, byte[] bArr, Executor executor) {
        AppGuard.checkPermission(AppPermission.Type.CLUSTER_WRITE);
        CompletableFuture completableFuture = new CompletableFuture();
        Callback callback = new Callback(completableFuture, executor);
        Long valueOf = Long.valueOf(this.messageIdGenerator.incrementAndGet());
        this.callbacks.put(valueOf, callback);
        return sendAsync(endpoint, new InternalMessage(this.preamble, this.clockService.timeNow(), valueOf.longValue(), this.localEp, str, bArr)).whenComplete((r5, th) -> {
            if (th != null) {
                this.callbacks.invalidate(valueOf);
            }
        }).thenComposeAsync(r3 -> {
            return completableFuture;
        }, executor);
    }

    public void registerHandler(String str, BiConsumer<Endpoint, byte[]> biConsumer, Executor executor) {
        AppGuard.checkPermission(AppPermission.Type.CLUSTER_WRITE);
        this.handlers.put(str, internalMessage -> {
            executor.execute(() -> {
                biConsumer.accept(internalMessage.sender(), internalMessage.payload());
            });
        });
    }

    public void registerHandler(String str, BiFunction<Endpoint, byte[], byte[]> biFunction, Executor executor) {
        AppGuard.checkPermission(AppPermission.Type.CLUSTER_WRITE);
        this.handlers.put(str, internalMessage -> {
            executor.execute(() -> {
                byte[] bArr = TLS_DISABLED;
                InternalMessage.Status status = InternalMessage.Status.OK;
                try {
                    bArr = (byte[]) biFunction.apply(internalMessage.sender(), internalMessage.payload());
                } catch (Exception e) {
                    status = InternalMessage.Status.ERROR_HANDLER_EXCEPTION;
                }
                sendReply(internalMessage, status, Optional.ofNullable(bArr));
            });
        });
    }

    public void registerHandler(String str, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> biFunction) {
        AppGuard.checkPermission(AppPermission.Type.CLUSTER_WRITE);
        this.handlers.put(str, internalMessage -> {
            ((CompletableFuture) biFunction.apply(internalMessage.sender(), internalMessage.payload())).whenComplete((bArr, th) -> {
                sendReply(internalMessage, th == null ? InternalMessage.Status.OK : InternalMessage.Status.ERROR_HANDLER_EXCEPTION, Optional.ofNullable(bArr));
            });
        });
    }

    public void unregisterHandler(String str) {
        AppGuard.checkPermission(AppPermission.Type.CLUSTER_WRITE);
        this.handlers.remove(str);
    }

    private void startAcceptingConnections() throws InterruptedException {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32768);
        serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8192);
        serverBootstrap.option(ChannelOption.SO_RCVBUF, 1048576);
        serverBootstrap.option(ChannelOption.TCP_NODELAY, true);
        serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        serverBootstrap.group(this.serverGroup, this.clientGroup);
        serverBootstrap.channel(this.serverChannelClass);
        if (this.enableNettyTls) {
            serverBootstrap.childHandler(new SslServerCommunicationChannelInitializer());
        } else {
            serverBootstrap.childHandler(new OnosCommunicationChannelInitializer());
        }
        serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
        serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        serverBootstrap.bind(this.localEp.port()).sync().addListener(future -> {
            if (future.isSuccess()) {
                this.log.info("{} accepting incoming connections on port {}", this.localEp.host(), Integer.valueOf(this.localEp.port()));
            } else {
                this.log.warn("{} failed to bind to port {}", new Object[]{this.localEp.host(), Integer.valueOf(this.localEp.port()), future.cause()});
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void dispatchLocally(InternalMessage internalMessage) throws IOException {
        if (internalMessage.preamble() != this.preamble) {
            this.log.debug("Received {} with invalid preamble from {}", internalMessage.type(), internalMessage.sender());
            sendReply(internalMessage, InternalMessage.Status.PROTOCOL_EXCEPTION, Optional.empty());
        }
        this.clockService.recordEventTime(internalMessage.time());
        String type = internalMessage.type();
        if (!REPLY_MESSAGE_TYPE.equals(type)) {
            Consumer<InternalMessage> consumer = this.handlers.get(type);
            if (consumer != null) {
                consumer.accept(internalMessage);
                return;
            } else {
                this.log.debug("No handler for message type {}", internalMessage.type(), internalMessage.sender());
                sendReply(internalMessage, InternalMessage.Status.ERROR_NO_HANDLER, Optional.empty());
                return;
            }
        }
        try {
            Callback callback = (Callback) this.callbacks.getIfPresent(Long.valueOf(internalMessage.id()));
            if (callback == null) {
                this.log.debug("Received a reply for message id:[{}].  from {}. But was unable to locate the request handle", Long.valueOf(internalMessage.id()), internalMessage.sender());
            } else if (internalMessage.status() == InternalMessage.Status.OK) {
                callback.complete(internalMessage.payload());
            } else if (internalMessage.status() == InternalMessage.Status.ERROR_NO_HANDLER) {
                callback.completeExceptionally(new MessagingException.NoRemoteHandler());
            } else if (internalMessage.status() == InternalMessage.Status.ERROR_HANDLER_EXCEPTION) {
                callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
            } else if (internalMessage.status() == InternalMessage.Status.PROTOCOL_EXCEPTION) {
                callback.completeExceptionally(new MessagingException.ProtocolException());
            }
        } finally {
            this.callbacks.invalidate(Long.valueOf(internalMessage.id()));
        }
    }

    private void sendReply(InternalMessage internalMessage, InternalMessage.Status status, Optional<byte[]> optional) {
        sendAsync(internalMessage.sender(), new InternalMessage(this.preamble, this.clockService.timeNow(), internalMessage.id(), this.localEp, REPLY_MESSAGE_TYPE, optional.orElse(new byte[TLS_DISABLED]), status)).whenComplete((r5, th) -> {
            if (th != null) {
                this.log.debug("Failed to respond", th);
            }
        });
    }

    protected void bindClockService(HybridLogicalClockService hybridLogicalClockService) {
        this.clockService = hybridLogicalClockService;
    }

    protected void unbindClockService(HybridLogicalClockService hybridLogicalClockService) {
        if (this.clockService == hybridLogicalClockService) {
            this.clockService = null;
        }
    }

    protected void bindClusterMetadataService(ClusterMetadataService clusterMetadataService) {
        this.clusterMetadataService = clusterMetadataService;
    }

    protected void unbindClusterMetadataService(ClusterMetadataService clusterMetadataService) {
        if (this.clusterMetadataService == clusterMetadataService) {
            this.clusterMetadataService = null;
        }
    }
}
