package org.vertx.java.core.eventbus.impl;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.AsyncResultHandler;
import org.vertx.java.core.Handler;
import org.vertx.java.core.SimpleHandler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.eventbus.impl.hazelcast.HazelcastClusterManager;
import org.vertx.java.core.impl.Context;
import org.vertx.java.core.impl.VertxInternal;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.core.logging.impl.LoggerFactory;
import org.vertx.java.core.net.NetClient;
import org.vertx.java.core.net.NetServer;
import org.vertx.java.core.net.NetSocket;
import org.vertx.java.core.net.impl.ServerID;
import org.vertx.java.core.parsetools.RecordParser;

/* loaded from: input_file:org/vertx/java/core/eventbus/impl/DefaultEventBus.class */
public class DefaultEventBus implements EventBus {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DefaultEventBus.class);
    private static final Buffer PONG = new Buffer(new byte[]{1});
    private static final long PING_INTERVAL = 20000;
    private static final long PING_REPLY_INTERVAL = 20000;
    public static final int DEFAULT_CLUSTER_PORT = 2550;
    private final VertxInternal vertx;
    private final ServerID serverID;
    private NetServer server;
    private SubsMap subs;
    private final ConcurrentMap<ServerID, ConnectionHolder> connections;
    private final ConcurrentMap<String, Handlers> handlerMap;
    private final Map<String, HandlerInfo> handlersByID;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/vertx/java/core/eventbus/impl/DefaultEventBus$ConnectionHolder.class */
    public class ConnectionHolder {
        final NetClient client;
        volatile NetSocket socket;
        final Queue<BaseMessage> pending;
        volatile boolean connected;
        long timeoutID;
        long pingTimeoutID;
        ServerID theServerID;

        private ConnectionHolder(NetClient netClient) {
            this.pending = new ConcurrentLinkedQueue();
            this.timeoutID = -1L;
            this.pingTimeoutID = -1L;
            this.client = netClient;
        }

        void writeMessage(BaseMessage baseMessage) {
            if (this.connected) {
                baseMessage.write(this.socket);
                return;
            }
            synchronized (this) {
                if (this.connected) {
                    baseMessage.write(this.socket);
                } else {
                    this.pending.add(baseMessage);
                }
            }
        }

        synchronized void connected(final ServerID serverID, NetSocket netSocket) {
            this.socket = netSocket;
            this.theServerID = serverID;
            this.connected = true;
            netSocket.exceptionHandler(new Handler<Exception>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.ConnectionHolder.1
                @Override // org.vertx.java.core.Handler
                public void handle(Exception exc) {
                    DefaultEventBus.this.cleanupConnection(serverID, ConnectionHolder.this, true);
                }
            });
            netSocket.closedHandler(new SimpleHandler() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.ConnectionHolder.2
                @Override // org.vertx.java.core.SimpleHandler
                public void handle() {
                    DefaultEventBus.this.cleanupConnection(serverID, ConnectionHolder.this, false);
                }
            });
            netSocket.dataHandler(new Handler<Buffer>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.ConnectionHolder.3
                @Override // org.vertx.java.core.Handler
                public void handle(Buffer buffer) {
                    DefaultEventBus.this.vertx.cancelTimer(ConnectionHolder.this.timeoutID);
                    DefaultEventBus.this.schedulePing(ConnectionHolder.this);
                }
            });
            DefaultEventBus.this.schedulePing(this);
            Iterator<BaseMessage> it = this.pending.iterator();
            while (it.hasNext()) {
                it.next().write(netSocket);
            }
            this.pending.clear();
        }

        void connect(NetClient netClient, final ServerID serverID) {
            netClient.connect(serverID.port, serverID.host, new Handler<NetSocket>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.ConnectionHolder.4
                @Override // org.vertx.java.core.Handler
                public void handle(NetSocket netSocket) {
                    ConnectionHolder.this.connected(serverID, netSocket);
                }
            });
            netClient.exceptionHandler(new Handler<Exception>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.ConnectionHolder.5
                @Override // org.vertx.java.core.Handler
                public void handle(Exception exc) {
                    DefaultEventBus.this.cleanupConnection(serverID, ConnectionHolder.this, true);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/vertx/java/core/eventbus/impl/DefaultEventBus$HandlerCloseHook.class */
    public class HandlerCloseHook implements Runnable {
        final List<String> ids;

        private HandlerCloseHook() {
            this.ids = new ArrayList();
        }

        @Override // java.lang.Runnable
        public void run() {
            Iterator it = new ArrayList(this.ids).iterator();
            while (it.hasNext()) {
                DefaultEventBus.this.unregisterHandler((String) it.next());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/vertx/java/core/eventbus/impl/DefaultEventBus$HandlerHolder.class */
    public static class HandlerHolder {
        final Context context;
        final Handler handler;
        final boolean replyHandler;

        HandlerHolder(Handler handler, boolean z, Context context) {
            this.context = context;
            this.handler = handler;
            this.replyHandler = z;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return this.handler.equals(((HandlerHolder) obj).handler);
        }

        public int hashCode() {
            return this.handler.hashCode();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/vertx/java/core/eventbus/impl/DefaultEventBus$HandlerInfo.class */
    public static class HandlerInfo {
        final String address;
        final Handler<? extends Message> handler;

        private HandlerInfo(String str, Handler<? extends Message> handler) {
            this.address = str;
            this.handler = handler;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/vertx/java/core/eventbus/impl/DefaultEventBus$Handlers.class */
    public static class Handlers {
        final Map<HandlerHolder, String> map;
        private Iterator<HandlerHolder> iter;

        private Handlers() {
            this.map = new ConcurrentHashMap();
        }

        synchronized HandlerHolder choose() {
            if (this.map.isEmpty()) {
                return null;
            }
            if (this.iter == null || !this.iter.hasNext()) {
                this.iter = this.map.keySet().iterator();
            }
            try {
                return this.iter.next();
            } catch (NoSuchElementException e) {
                return null;
            }
        }
    }

    public DefaultEventBus(VertxInternal vertxInternal) {
        this.connections = new ConcurrentHashMap();
        this.handlerMap = new ConcurrentHashMap();
        this.handlersByID = new ConcurrentHashMap();
        this.vertx = vertxInternal;
        this.serverID = new ServerID(DEFAULT_CLUSTER_PORT, "localhost");
        this.server = null;
        this.subs = null;
    }

    public DefaultEventBus(VertxInternal vertxInternal, String str) {
        this(vertxInternal, DEFAULT_CLUSTER_PORT, str);
    }

    public DefaultEventBus(VertxInternal vertxInternal, int i, String str) {
        this.connections = new ConcurrentHashMap();
        this.handlerMap = new ConcurrentHashMap();
        this.handlersByID = new ConcurrentHashMap();
        this.vertx = vertxInternal;
        this.serverID = new ServerID(i, str);
        this.subs = new HazelcastClusterManager(vertxInternal).getSubsMap("subs");
        this.server = setServer();
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, JsonObject jsonObject, Handler<Message<JsonObject>> handler) {
        sendOrPub(new JsonObjectMessage(true, str, jsonObject), handler);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, JsonObject jsonObject) {
        send(str, jsonObject, (Handler<Message<JsonObject>>) null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, JsonArray jsonArray, Handler<Message<JsonArray>> handler) {
        sendOrPub(new JsonArrayMessage(true, str, jsonArray), handler);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, JsonArray jsonArray) {
        send(str, jsonArray, (Handler<Message<JsonArray>>) null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, Buffer buffer, Handler<Message<Buffer>> handler) {
        sendOrPub(new BufferMessage(true, str, buffer), handler);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, Buffer buffer) {
        send(str, buffer, (Handler<Message<Buffer>>) null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, byte[] bArr, Handler<Message<byte[]>> handler) {
        sendOrPub(new ByteArrayMessage(true, str, bArr), handler);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, byte[] bArr) {
        send(str, bArr, (Handler<Message<byte[]>>) null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, String str2, Handler<Message<String>> handler) {
        sendOrPub(new StringMessage(true, str, str2), handler);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, String str2) {
        send(str, str2, (Handler<Message<String>>) null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, Integer num, Handler<Message<Integer>> handler) {
        sendOrPub(new IntMessage(true, str, num), handler);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, Integer num) {
        send(str, num, (Handler<Message<Integer>>) null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, Long l, Handler<Message<Long>> handler) {
        sendOrPub(new LongMessage(true, str, l), handler);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, Long l) {
        send(str, l, (Handler<Message<Long>>) null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, Float f, Handler<Message<Float>> handler) {
        sendOrPub(new FloatMessage(true, str, f), handler);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, Float f) {
        send(str, f, (Handler<Message<Float>>) null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, Double d, Handler<Message<Double>> handler) {
        sendOrPub(new DoubleMessage(true, str, d), handler);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, Double d) {
        send(str, d, (Handler<Message<Double>>) null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, Boolean bool, Handler<Message<Boolean>> handler) {
        sendOrPub(new BooleanMessage(true, str, bool), handler);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, Boolean bool) {
        send(str, bool, (Handler<Message<Boolean>>) null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, Short sh, Handler<Message<Short>> handler) {
        sendOrPub(new ShortMessage(true, str, sh), handler);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, Short sh) {
        send(str, sh, (Handler<Message<Short>>) null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, Character ch, Handler<Message<Character>> handler) {
        sendOrPub(new CharacterMessage(true, str, ch), handler);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, Character ch) {
        send(str, ch, (Handler<Message<Character>>) null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, Byte b, Handler<Message<Byte>> handler) {
        sendOrPub(new ByteMessage(true, str, b), handler);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void send(String str, Byte b) {
        send(str, b, (Handler<Message<Byte>>) null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void publish(String str, JsonObject jsonObject) {
        sendOrPub(new JsonObjectMessage(false, str, jsonObject), null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void publish(String str, JsonArray jsonArray) {
        sendOrPub(new JsonArrayMessage(false, str, jsonArray), null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void publish(String str, Buffer buffer) {
        sendOrPub(new BufferMessage(false, str, buffer), null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void publish(String str, byte[] bArr) {
        sendOrPub(new ByteArrayMessage(false, str, bArr), null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void publish(String str, String str2) {
        sendOrPub(new StringMessage(false, str, str2), null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void publish(String str, Integer num) {
        sendOrPub(new IntMessage(false, str, num), null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void publish(String str, Long l) {
        sendOrPub(new LongMessage(false, str, l), null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void publish(String str, Float f) {
        sendOrPub(new FloatMessage(false, str, f), null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void publish(String str, Double d) {
        sendOrPub(new DoubleMessage(false, str, d), null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void publish(String str, Boolean bool) {
        sendOrPub(new BooleanMessage(false, str, bool), null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void publish(String str, Short sh) {
        sendOrPub(new ShortMessage(false, str, sh), null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void publish(String str, Character ch) {
        sendOrPub(new CharacterMessage(false, str, ch), null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void publish(String str, Byte b) {
        sendOrPub(new ByteMessage(false, str, b), null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void unregisterHandler(String str, Handler<? extends Message> handler, AsyncResultHandler<Void> asyncResultHandler) {
        Context orAssignContext = this.vertx.getOrAssignContext();
        Handlers handlers = this.handlerMap.get(str);
        if (handlers != null) {
            String remove = handlers.map.remove(new HandlerHolder(handler, false, orAssignContext));
            if (remove != null) {
                this.handlersByID.remove(remove);
                getHandlerCloseHook(orAssignContext).ids.remove(remove);
            }
            if (!handlers.map.isEmpty()) {
                if (asyncResultHandler != null) {
                    callCompletionHandler(asyncResultHandler);
                    return;
                }
                return;
            }
            this.handlerMap.remove(str);
            if (this.subs != null) {
                removeSub(str, this.serverID, asyncResultHandler);
            } else if (asyncResultHandler != null) {
                callCompletionHandler(asyncResultHandler);
            }
        }
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void unregisterHandler(String str, Handler<? extends Message> handler) {
        unregisterHandler(str, handler, null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void unregisterHandler(String str) {
        unregisterHandler(str, (AsyncResultHandler<Void>) null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public void unregisterHandler(String str, AsyncResultHandler<Void> asyncResultHandler) {
        HandlerInfo handlerInfo = this.handlersByID.get(str);
        if (handlerInfo != null) {
            unregisterHandler(handlerInfo.address, handlerInfo.handler, asyncResultHandler);
        } else if (asyncResultHandler != null) {
            callCompletionHandler(asyncResultHandler);
        }
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public String registerHandler(Handler<? extends Message> handler) {
        return registerHandler(handler, (AsyncResultHandler<Void>) null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public String registerHandler(Handler<? extends Message> handler, AsyncResultHandler<Void> asyncResultHandler) {
        return registerHandler(null, handler, asyncResultHandler, false, false);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public String registerHandler(String str, Handler<? extends Message> handler, AsyncResultHandler<Void> asyncResultHandler) {
        return registerHandler(str, handler, asyncResultHandler, false, false);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public String registerHandler(String str, Handler<? extends Message> handler) {
        return registerHandler(str, handler, null);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public String registerLocalHandler(String str, Handler<? extends Message> handler) {
        return registerHandler(str, handler, null, false, true);
    }

    @Override // org.vertx.java.core.eventbus.EventBus
    public String registerLocalHandler(Handler<? extends Message> handler) {
        return registerHandler(null, handler, null, false, true);
    }

    public void close(Handler<Void> handler) {
        this.server.close(handler);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendReply(ServerID serverID, BaseMessage baseMessage, Handler handler) {
        sendOrPub(serverID, baseMessage, handler);
    }

    private NetServer setServer() {
        return this.vertx.createNetServer().connectHandler(new Handler<NetSocket>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.1
            @Override // org.vertx.java.core.Handler
            public void handle(final NetSocket netSocket) {
                final RecordParser newFixed = RecordParser.newFixed(4, null);
                newFixed.setOutput(new Handler<Buffer>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.1.1
                    int size = -1;

                    @Override // org.vertx.java.core.Handler
                    public void handle(Buffer buffer) {
                        if (this.size == -1) {
                            this.size = buffer.getInt(0);
                            newFixed.fixedSizeMode(this.size);
                            return;
                        }
                        BaseMessage read = MessageFactory.read(buffer);
                        if (read.type() == 0) {
                            netSocket.write(DefaultEventBus.PONG);
                        } else {
                            DefaultEventBus.this.receiveMessage(read);
                        }
                        newFixed.fixedSizeMode(4);
                        this.size = -1;
                    }
                });
                netSocket.dataHandler(newFixed);
            }
        }).listen(this.serverID.port, this.serverID.host);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendToSubs(ServerIDs serverIDs, BaseMessage baseMessage) {
        if (baseMessage.send) {
            ServerID choose = serverIDs.choose();
            if (choose.equals(this.serverID)) {
                receiveMessage(baseMessage);
                return;
            } else {
                sendRemote(choose, baseMessage);
                return;
            }
        }
        Iterator<ServerID> it = serverIDs.iterator();
        while (it.hasNext()) {
            ServerID next = it.next();
            if (next.equals(this.serverID)) {
                receiveMessage(baseMessage);
            } else {
                sendRemote(next, baseMessage);
            }
        }
    }

    private void sendOrPub(BaseMessage baseMessage, Handler handler) {
        sendOrPub(null, baseMessage, handler);
    }

    private void sendOrPub(ServerID serverID, final BaseMessage baseMessage, Handler handler) {
        Context orAssignContext = this.vertx.getOrAssignContext();
        try {
            baseMessage.sender = this.serverID;
            if (handler != null) {
                baseMessage.replyAddress = UUID.randomUUID().toString();
                registerHandler(baseMessage.replyAddress, handler, null, true, false);
            }
            if (serverID != null) {
                if (serverID.equals(this.serverID)) {
                    receiveMessage(baseMessage);
                } else {
                    sendRemote(serverID, baseMessage);
                }
            } else if (this.subs != null) {
                this.subs.get(baseMessage.address, new AsyncResultHandler<ServerIDs>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.2
                    @Override // org.vertx.java.core.Handler
                    public void handle(AsyncResult<ServerIDs> asyncResult) {
                        if (asyncResult.exception != null) {
                            DefaultEventBus.log.error("Failed to send message", asyncResult.exception);
                            return;
                        }
                        ServerIDs serverIDs = asyncResult.result;
                        if (serverIDs.isEmpty()) {
                            DefaultEventBus.this.receiveMessage(baseMessage);
                        } else {
                            DefaultEventBus.this.sendToSubs(serverIDs, baseMessage);
                        }
                    }
                });
            } else {
                receiveMessage(baseMessage);
            }
        } finally {
            if (orAssignContext != null) {
                Context.setContext(orAssignContext);
            }
        }
    }

    private String registerHandler(String str, Handler<? extends Message> handler, AsyncResultHandler<Void> asyncResultHandler, boolean z, boolean z2) {
        Context orAssignContext = this.vertx.getOrAssignContext();
        String uuid = UUID.randomUUID().toString();
        if (str == null) {
            str = uuid;
        }
        this.handlersByID.put(uuid, new HandlerInfo(str, handler));
        Handlers handlers = this.handlerMap.get(str);
        if (handlers == null) {
            Handlers handlers2 = new Handlers();
            Handlers putIfAbsent = this.handlerMap.putIfAbsent(str, handlers2);
            if (putIfAbsent != null) {
                handlers2 = putIfAbsent;
            }
            if (asyncResultHandler == null) {
                asyncResultHandler = new AsyncResultHandler<Void>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.3
                    @Override // org.vertx.java.core.Handler
                    public void handle(AsyncResult<Void> asyncResult) {
                        if (asyncResult.exception != null) {
                            DefaultEventBus.log.error("Failed to remove entry", asyncResult.exception);
                        }
                    }
                };
            }
            handlers2.map.put(new HandlerHolder(handler, z, orAssignContext), uuid);
            if (this.subs == null || z || z2) {
                callCompletionHandler(asyncResultHandler);
            } else {
                this.subs.put(str, this.serverID, asyncResultHandler);
            }
        } else {
            handlers.map.put(new HandlerHolder(handler, z, orAssignContext), uuid);
            if (asyncResultHandler != null) {
                callCompletionHandler(asyncResultHandler);
            }
        }
        getHandlerCloseHook(orAssignContext).ids.add(uuid);
        return uuid;
    }

    private HandlerCloseHook getHandlerCloseHook(Context context) {
        HandlerCloseHook handlerCloseHook = (HandlerCloseHook) context.getCloseHook(this);
        if (handlerCloseHook == null) {
            handlerCloseHook = new HandlerCloseHook();
            context.putCloseHook(this, handlerCloseHook);
        }
        return handlerCloseHook;
    }

    private void callCompletionHandler(AsyncResultHandler<Void> asyncResultHandler) {
        asyncResultHandler.handle(new AsyncResult((Void) null));
    }

    private void cleanSubsForServerID(ServerID serverID) {
        if (this.subs != null) {
            this.subs.removeAllForServerID(serverID, new AsyncResultHandler<Void>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.4
                @Override // org.vertx.java.core.Handler
                public void handle(AsyncResult<Void> asyncResult) {
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanupConnection(ServerID serverID, ConnectionHolder connectionHolder, boolean z) {
        if (connectionHolder.timeoutID != -1) {
            this.vertx.cancelTimer(connectionHolder.timeoutID);
        }
        if (connectionHolder.pingTimeoutID != -1) {
            this.vertx.cancelTimer(connectionHolder.pingTimeoutID);
        }
        try {
            connectionHolder.socket.close();
        } catch (Exception e) {
        }
        if (this.connections.remove(serverID, connectionHolder)) {
            log.debug("Cluster connection closed: " + serverID + " holder " + connectionHolder);
            if (z) {
                cleanSubsForServerID(serverID);
            }
        }
    }

    private void sendRemote(ServerID serverID, BaseMessage baseMessage) {
        ConnectionHolder connectionHolder = this.connections.get(serverID);
        if (connectionHolder == null) {
            NetClient createNetClient = this.vertx.createNetClient();
            createNetClient.setConnectTimeout(60000L);
            connectionHolder = new ConnectionHolder(createNetClient);
            ConnectionHolder putIfAbsent = this.connections.putIfAbsent(serverID, connectionHolder);
            if (putIfAbsent != null) {
                connectionHolder = putIfAbsent;
            } else {
                connectionHolder.connect(createNetClient, serverID);
            }
        }
        connectionHolder.writeMessage(baseMessage);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void schedulePing(final ConnectionHolder connectionHolder) {
        connectionHolder.pingTimeoutID = this.vertx.setTimer(20000L, new Handler<Long>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.5
            @Override // org.vertx.java.core.Handler
            public void handle(Long l) {
                connectionHolder.timeoutID = DefaultEventBus.this.vertx.setTimer(20000L, new Handler<Long>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.5.1
                    @Override // org.vertx.java.core.Handler
                    public void handle(Long l2) {
                        DefaultEventBus.log.info("No pong from server " + DefaultEventBus.this.serverID + " - will consider it dead, timerID: " + l2 + " holder " + connectionHolder);
                        DefaultEventBus.this.cleanupConnection(connectionHolder.theServerID, connectionHolder, true);
                    }
                });
                new PingMessage(DefaultEventBus.this.serverID).write(connectionHolder.socket);
            }
        });
    }

    private void removeSub(String str, ServerID serverID, final AsyncResultHandler<Void> asyncResultHandler) {
        this.subs.remove(str, serverID, new AsyncResultHandler<Boolean>() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.6
            @Override // org.vertx.java.core.Handler
            public void handle(AsyncResult<Boolean> asyncResult) {
                if (asyncResultHandler != null) {
                    asyncResultHandler.handle(asyncResult.exception != null ? new AsyncResult(asyncResult.exception) : new AsyncResult((Void) null));
                } else if (asyncResult.exception != null) {
                    DefaultEventBus.log.error("Failed to remove subscription", asyncResult.exception);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void receiveMessage(BaseMessage baseMessage) {
        baseMessage.bus = this;
        Handlers handlers = this.handlerMap.get(baseMessage.address);
        if (handlers != null) {
            if (!baseMessage.send) {
                Iterator<HandlerHolder> it = handlers.map.keySet().iterator();
                while (it.hasNext()) {
                    doReceive(handlers, baseMessage, it.next());
                }
            } else {
                HandlerHolder choose = handlers.choose();
                if (choose != null) {
                    doReceive(handlers, baseMessage, choose);
                }
            }
        }
    }

    private void doReceive(final Handlers handlers, final BaseMessage baseMessage, final HandlerHolder handlerHolder) {
        final Message copy = baseMessage.copy();
        handlerHolder.context.execute(new Runnable() { // from class: org.vertx.java.core.eventbus.impl.DefaultEventBus.7
            @Override // java.lang.Runnable
            public void run() {
                if (handlers.map.containsKey(handlerHolder)) {
                    try {
                        handlerHolder.handler.handle(copy);
                        if (handlerHolder.replyHandler) {
                            DefaultEventBus.this.unregisterHandler(baseMessage.address, handlerHolder.handler);
                        }
                    } catch (Throwable th) {
                        if (handlerHolder.replyHandler) {
                            DefaultEventBus.this.unregisterHandler(baseMessage.address, handlerHolder.handler);
                        }
                        throw th;
                    }
                }
            }
        });
    }
}
