/*
 * Decompiled with CFR 0.152.
 */
package org.enodeframework.queue;

import io.vertx.core.eventbus.EventBusOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.streams.WriteStream;
import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameHelper;
import java.net.URI;
import java.util.ArrayDeque;
import java.util.Queue;
import org.enodeframework.common.exception.ReplyAddressException;
import org.enodeframework.common.utils.ReplyUtil;
import org.enodeframework.queue.OutboundDeliveryContext;
import org.enodeframework.queue.PointToPointEventBus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionHolder {
    private static final Logger log = LoggerFactory.getLogger(ConnectionHolder.class);
    private final PointToPointEventBus eventBus;
    private final String remoteNodeAddress;
    private Queue<OutboundDeliveryContext> pending;
    private NetSocket socket;
    private boolean connected;
    private long timeoutID = -1L;
    private long pingTimeoutID = -1L;

    public ConnectionHolder(PointToPointEventBus eventBus, String remoteNodeAddress) {
        this.eventBus = eventBus;
        this.remoteNodeAddress = remoteNodeAddress;
    }

    void connect() {
        URI info = ReplyUtil.toURI(this.remoteNodeAddress).orElseThrow(() -> new ReplyAddressException(String.format("Parse remoteNodeAddress: %s  failed, please check.", this.remoteNodeAddress)));
        this.eventBus.client().connect(info.getPort(), info.getHost()).onComplete(ar -> {
            if (ar.succeeded()) {
                this.connected((NetSocket)ar.result());
            } else {
                log.warn("Connecting to server " + this.remoteNodeAddress + " failed", ar.cause());
                this.close(ar.cause());
            }
        });
    }

    synchronized void writeMessage(OutboundDeliveryContext ctx) {
        if (this.connected) {
            FrameHelper.sendFrame((String)"send", (String)this.remoteNodeAddress, (Object)ctx.message, (WriteStream)this.socket);
        } else {
            if (this.pending == null) {
                if (log.isDebugEnabled()) {
                    log.debug("Not connected to server " + this.remoteNodeAddress + " - starting queuing");
                }
                this.pending = new ArrayDeque<OutboundDeliveryContext>();
            }
            this.pending.add(ctx);
        }
    }

    void close() {
        this.close((Throwable)ConnectionBase.CLOSED_EXCEPTION);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void close(Throwable cause) {
        if (this.timeoutID != -1L) {
            this.eventBus.vertx().cancelTimer(this.timeoutID);
        }
        if (this.pingTimeoutID != -1L) {
            this.eventBus.vertx().cancelTimer(this.pingTimeoutID);
        }
        ConnectionHolder connectionHolder = this;
        synchronized (connectionHolder) {
            if (this.pending != null) {
                OutboundDeliveryContext msg;
                while ((msg = this.pending.poll()) != null) {
                    msg.written(cause);
                    log.error("connection closed, queue msg: {}", (Object)msg, (Object)cause);
                }
            }
        }
        if (this.eventBus.connections().remove(this.remoteNodeAddress, this) && log.isDebugEnabled()) {
            log.debug("Point to point connection closed for server " + this.remoteNodeAddress);
        }
    }

    private void schedulePing() {
        EventBusOptions options = this.eventBus.options();
        this.pingTimeoutID = this.eventBus.vertx().setTimer(options.getClusterPingInterval(), id1 -> {
            this.timeoutID = this.eventBus.vertx().setTimer(options.getClusterPingReplyInterval(), id2 -> {
                log.warn("No pong from server " + this.remoteNodeAddress + " - will consider it dead");
                this.close();
            });
            FrameHelper.sendFrame((String)"ping", (String)this.remoteNodeAddress, (Object)new JsonObject(), (WriteStream)this.socket);
        });
    }

    private synchronized void connected(NetSocket socket) {
        this.socket = socket;
        this.connected = true;
        socket.exceptionHandler(err -> this.close((Throwable)err));
        socket.closeHandler(v -> this.close());
        socket.handler(data -> {
            this.eventBus.vertx().cancelTimer(this.timeoutID);
            this.schedulePing();
        });
        this.schedulePing();
        if (this.pending != null) {
            if (log.isDebugEnabled()) {
                log.debug("Draining the queue for server " + this.remoteNodeAddress);
            }
            for (OutboundDeliveryContext ctx : this.pending) {
                FrameHelper.sendFrame((String)"send", (String)this.remoteNodeAddress, (Object)ctx.message, (WriteStream)socket);
            }
        }
        this.pending = null;
    }
}

