package org.neo4j.cluster.com;

import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.ClosedChannelException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.neo4j.cluster.com.NetworkReceiver;
import org.neo4j.cluster.com.message.Message;
import org.neo4j.cluster.com.message.MessageSender;
import org.neo4j.cluster.com.message.MessageType;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.Listeners;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

/* loaded from: input_file:org/neo4j/cluster/com/NetworkSender.class */
public class NetworkSender implements MessageSender, Lifecycle {
    private ChannelGroup channels;
    private ClientBootstrap clientBootstrap;
    private final Monitor monitor;
    private Configuration config;
    private final NetworkReceiver receiver;
    private Log msgLog;
    private URI me;
    private Map<URI, ExecutorService> senderExecutors = new HashMap();
    private Set<URI> failedInstances = new HashSet();
    private Map<URI, Channel> connections = new ConcurrentHashMap();
    private Iterable<NetworkChannelsListener> listeners = Listeners.newListeners();

    /* loaded from: input_file:org/neo4j/cluster/com/NetworkSender$Configuration.class */
    public interface Configuration {
        int defaultPort();

        int port();
    }

    /* loaded from: input_file:org/neo4j/cluster/com/NetworkSender$Monitor.class */
    public interface Monitor extends NamedThreadFactory.Monitor {
        void queuedMessage(Message message);

        void sentMessage(Message message);
    }

    /* loaded from: input_file:org/neo4j/cluster/com/NetworkSender$NetworkChannelsListener.class */
    public interface NetworkChannelsListener {
        void channelOpened(URI uri);

        void channelClosed(URI uri);
    }

    /* loaded from: input_file:org/neo4j/cluster/com/NetworkSender$NetworkMessageSender.class */
    private class NetworkMessageSender extends SimpleChannelHandler {
        private NetworkMessageSender() {
        }

        public void channelConnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
            Channel channel = channelHandlerContext.getChannel();
            NetworkSender.this.openedChannel(NetworkSender.this.getURI((InetSocketAddress) channel.getRemoteAddress()), channel);
            NetworkSender.this.channels.add(channel);
        }

        public void channelClosed(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
            NetworkSender.this.closedChannel(channelHandlerContext.getChannel());
            NetworkSender.this.channels.remove(channelHandlerContext.getChannel());
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) throws Exception {
            Throwable cause = exceptionEvent.getCause();
            if ((cause instanceof ConnectException) || (cause instanceof RejectedExecutionException)) {
                return;
            }
            NetworkSender.this.msgLog.error("Receive exception:", cause);
        }
    }

    /* loaded from: input_file:org/neo4j/cluster/com/NetworkSender$NetworkNodePipelineFactory.class */
    private class NetworkNodePipelineFactory implements ChannelPipelineFactory {
        private NetworkNodePipelineFactory() {
        }

        public ChannelPipeline getPipeline() throws Exception {
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("frameEncoder", new ObjectEncoder(2048));
            pipeline.addLast("sender", new NetworkMessageSender());
            return pipeline;
        }
    }

    public NetworkSender(Monitor monitor, Configuration configuration, NetworkReceiver networkReceiver, LogProvider logProvider) {
        this.monitor = monitor;
        this.config = configuration;
        this.receiver = networkReceiver;
        this.msgLog = logProvider.getLog(getClass());
        this.me = URI.create("cluster://0.0.0.0:" + configuration.port());
        networkReceiver.addNetworkChannelsListener(new NetworkReceiver.NetworkChannelsListener() { // from class: org.neo4j.cluster.com.NetworkSender.1
            @Override // org.neo4j.cluster.com.NetworkReceiver.NetworkChannelsListener
            public void listeningAt(URI uri) {
                NetworkSender.this.me = uri;
            }

            @Override // org.neo4j.cluster.com.NetworkReceiver.NetworkChannelsListener
            public void channelOpened(URI uri) {
            }

            @Override // org.neo4j.cluster.com.NetworkReceiver.NetworkChannelsListener
            public void channelClosed(URI uri) {
            }
        });
    }

    public void init() throws Throwable {
        ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
    }

    public void start() throws Throwable {
        this.channels = new DefaultChannelGroup();
        this.clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newSingleThreadExecutor(NamedThreadFactory.daemon("Cluster client boss", this.monitor)), Executors.newFixedThreadPool(2, NamedThreadFactory.daemon("Cluster client worker", this.monitor)), 2));
        this.clientBootstrap.setOption("tcpNoDelay", true);
        this.clientBootstrap.setPipelineFactory(new NetworkNodePipelineFactory());
    }

    public void stop() throws Throwable {
        this.msgLog.debug("Shutting down NetworkSender");
        Iterator<ExecutorService> it = this.senderExecutors.values().iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        for (Map.Entry<URI, ExecutorService> entry : this.senderExecutors.entrySet()) {
            URI key = entry.getKey();
            if (!entry.getValue().awaitTermination(50L, TimeUnit.SECONDS)) {
                this.msgLog.warn("Could not shut down send executor towards: " + key);
            }
        }
        this.senderExecutors.clear();
        this.channels.close().awaitUninterruptibly();
        this.clientBootstrap.releaseExternalResources();
        this.msgLog.debug("Shutting down NetworkSender complete");
    }

    public void shutdown() throws Throwable {
    }

    @Override // org.neo4j.cluster.com.message.MessageSender
    public void process(List<Message<? extends MessageType>> list) {
        for (Message<? extends MessageType> message : list) {
            try {
                process(message);
            } catch (Exception e) {
                this.msgLog.warn("Error sending message " + message + "(" + e.getMessage() + ")");
            }
        }
    }

    @Override // org.neo4j.cluster.com.message.MessageProcessor
    public boolean process(Message<? extends MessageType> message) {
        if (message.hasHeader(Message.TO)) {
            send(message);
            return true;
        }
        this.receiver.receive(message);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public URI getURI(InetSocketAddress inetSocketAddress) throws URISyntaxException {
        return new URI("cluster:/" + inetSocketAddress);
    }

    private synchronized void send(final Message message) {
        this.monitor.queuedMessage(message);
        final URI create = URI.create(message.getHeader(Message.TO));
        ExecutorService executorService = this.senderExecutors.get(create);
        if (executorService == null) {
            executorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("Cluster Sender " + create.toASCIIString(), this.monitor));
            this.senderExecutors.put(create, executorService);
        }
        executorService.submit(new Runnable() { // from class: org.neo4j.cluster.com.NetworkSender.2
            @Override // java.lang.Runnable
            public void run() {
                Channel channel = NetworkSender.this.getChannel(create);
                if (channel == null) {
                    try {
                        channel = NetworkSender.this.openChannel(create);
                        NetworkSender.this.openedChannel(create, channel);
                        NetworkSender.this.failedInstances.remove(create);
                    } catch (Exception e) {
                        if (NetworkSender.this.failedInstances.contains(create)) {
                            return;
                        }
                        NetworkSender.this.msgLog.warn(e.getMessage());
                        NetworkSender.this.failedInstances.add(create);
                        return;
                    }
                }
                try {
                    message.setHeader(Message.FROM, NetworkSender.this.me.toASCIIString());
                    NetworkSender.this.msgLog.debug("Sending to " + create + ": " + message);
                    channel.write(message).addListener(new ChannelFutureListener() { // from class: org.neo4j.cluster.com.NetworkSender.2.1
                        public void operationComplete(ChannelFuture channelFuture) throws Exception {
                            NetworkSender.this.monitor.sentMessage(message);
                            if (channelFuture.isSuccess()) {
                                return;
                            }
                            NetworkSender.this.msgLog.debug("Unable to write " + message + " to " + channelFuture.getChannel(), channelFuture.getCause());
                        }
                    });
                } catch (Exception e2) {
                    if (Exceptions.contains(e2, new Class[]{ClosedChannelException.class})) {
                        NetworkSender.this.msgLog.warn("Could not send message, because the connection has been closed.");
                    } else {
                        NetworkSender.this.msgLog.warn("Could not send message", e2);
                    }
                    channel.close();
                }
            }
        });
    }

    protected void openedChannel(final URI uri, Channel channel) {
        this.connections.put(uri, channel);
        Listeners.notifyListeners(this.listeners, new Listeners.Notification<NetworkChannelsListener>() { // from class: org.neo4j.cluster.com.NetworkSender.3
            public void notify(NetworkChannelsListener networkChannelsListener) {
                networkChannelsListener.channelOpened(uri);
            }
        });
    }

    protected void closedChannel(Channel channel) {
        URI uri = null;
        Iterator<Map.Entry<URI, Channel>> it = this.connections.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<URI, Channel> next = it.next();
            if (next.getValue().equals(channel)) {
                uri = next.getKey();
                break;
            }
        }
        if (uri == null) {
            return;
        }
        this.connections.remove(uri);
        final URI uri2 = uri;
        Listeners.notifyListeners(this.listeners, new Listeners.Notification<NetworkChannelsListener>() { // from class: org.neo4j.cluster.com.NetworkSender.4
            public void notify(NetworkChannelsListener networkChannelsListener) {
                networkChannelsListener.channelClosed(uri2);
            }
        });
    }

    public Channel getChannel(URI uri) {
        return this.connections.get(uri);
    }

    public void addNetworkChannelsListener(NetworkChannelsListener networkChannelsListener) {
        this.listeners = Listeners.addListener(networkChannelsListener, this.listeners);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Channel openChannel(URI uri) {
        InetSocketAddress inetSocketAddress = new InetSocketAddress(uri.getHost(), uri.getPort() == -1 ? this.config.defaultPort() : uri.getPort());
        ChannelFuture connect = this.clientBootstrap.connect(inetSocketAddress);
        try {
            if (!connect.await(5L, TimeUnit.SECONDS) || !connect.getChannel().isConnected()) {
                throw new ChannelOpenFailedException("Client could not connect to " + inetSocketAddress);
            }
            this.msgLog.info(this.me + " opened a new channel to " + inetSocketAddress);
            return connect.getChannel();
        } catch (InterruptedException e) {
            this.msgLog.warn("Interrupted", e);
            Thread.currentThread().interrupt();
            throw new ChannelOpenFailedException(e);
        }
    }
}
