package org.smallmind.bayeux.oumuamua.server.spi.extension;

import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import org.smallmind.bayeux.oumuamua.server.api.Packet;
import org.smallmind.bayeux.oumuamua.server.api.Session;
import org.smallmind.bayeux.oumuamua.server.api.json.BooleanValue;
import org.smallmind.bayeux.oumuamua.server.api.json.Message;
import org.smallmind.bayeux.oumuamua.server.api.json.NumberValue;
import org.smallmind.bayeux.oumuamua.server.api.json.ObjectValue;
import org.smallmind.bayeux.oumuamua.server.api.json.Value;
import org.smallmind.bayeux.oumuamua.server.api.json.ValueType;
import org.smallmind.bayeux.oumuamua.server.spi.json.PacketUtility;
import org.smallmind.bayeux.oumuamua.server.spi.meta.Meta;
import org.smallmind.scribe.pen.LoggerManager;

/* loaded from: input_file:org/smallmind/bayeux/oumuamua/server/spi/extension/AckExtension.class */
public class AckExtension<V extends Value<V>> extends AbstractServerPacketListener<V> {
    private static final String ACK_FLAG_ATTRIBUTE = "org.smallmind.bayeux.oumuamua.extension.ack.flag";
    private static final String ACK_COUNTER_ATTRIBUTE = "org.smallmind.bayeux.oumuamua.extension.ack.counter";
    private static final String ACK_SIZE_ATTRIBUTE = "org.smallmind.bayeux.oumuamua.extension.ack.size";
    private static final String ACK_UNACKNOWLEDGED_MAP_ATTRIBUTE = "org.smallmind.bayeux.oumuamua.extension.ack.unacknowledged_map";
    private static final String ACK_RESEND_QUEUE_ATTRIBUTE = "org.smallmind.bayeux.oumuamua.extension.ack.resend_queue";
    private final int maxAckQueueSize;

    public AckExtension(int i) {
        this.maxAckQueueSize = i;
    }

    @Override // org.smallmind.bayeux.oumuamua.server.spi.extension.AbstractServerPacketListener
    public Packet<V> onRequest(Session<V> session, Packet<V> packet) {
        NumberValue numberValue;
        BooleanValue booleanValue;
        if (session != null) {
            if (Meta.HANDSHAKE.getRoute().equals(packet.getRoute())) {
                Message[] messages = packet.getMessages();
                int length = messages.length;
                int i = 0;
                while (true) {
                    if (i >= length) {
                        break;
                    }
                    ObjectValue ext = messages[i].getExt();
                    if (ext == null || (booleanValue = ext.get("ack")) == null || !ValueType.BOOLEAN.equals(booleanValue.getType()) || !booleanValue.asBoolean()) {
                        i++;
                    } else {
                        synchronized (session) {
                            if (!Boolean.TRUE.equals(session.getAttribute(ACK_FLAG_ATTRIBUTE))) {
                                session.setAttribute(ACK_FLAG_ATTRIBUTE, Boolean.TRUE);
                                session.setAttribute(ACK_COUNTER_ATTRIBUTE, new AtomicLong(0L));
                                session.setAttribute(ACK_SIZE_ATTRIBUTE, new AtomicLong(0L));
                                session.setAttribute(ACK_UNACKNOWLEDGED_MAP_ATTRIBUTE, new ConcurrentSkipListMap());
                                session.setAttribute(ACK_RESEND_QUEUE_ATTRIBUTE, new ConcurrentLinkedQueue());
                            }
                        }
                    }
                }
            } else if (Meta.CONNECT.getRoute().equals(packet.getRoute()) && Boolean.TRUE.equals(session.getAttribute(ACK_FLAG_ATTRIBUTE))) {
                Long l = null;
                for (Message message : packet.getMessages()) {
                    ObjectValue ext2 = message.getExt();
                    if (ext2 != null && (numberValue = ext2.get("ack")) != null && ValueType.NUMBER.equals(numberValue.getType()) && (l == null || l.longValue() < numberValue.asLong())) {
                        l = Long.valueOf(numberValue.asLong());
                    }
                }
                if (l != null) {
                    ConcurrentSkipListMap concurrentSkipListMap = (ConcurrentSkipListMap) session.getAttribute(ACK_UNACKNOWLEDGED_MAP_ATTRIBUTE);
                    ConcurrentLinkedQueue concurrentLinkedQueue = (ConcurrentLinkedQueue) session.getAttribute(ACK_RESEND_QUEUE_ATTRIBUTE);
                    AtomicLong atomicLong = (AtomicLong) session.getAttribute(ACK_SIZE_ATTRIBUTE);
                    concurrentSkipListMap.remove(l);
                    atomicLong.decrementAndGet();
                    Iterator it = concurrentSkipListMap.headMap((ConcurrentSkipListMap) l).entrySet().iterator();
                    while (it.hasNext()) {
                        concurrentLinkedQueue.add(packet);
                        it.remove();
                    }
                }
            }
        }
        return packet;
    }

    @Override // org.smallmind.bayeux.oumuamua.server.spi.extension.AbstractServerPacketListener
    public Packet<V> onResponse(Session<V> session, Packet<V> packet) {
        if (session != null) {
            if (Meta.HANDSHAKE.getRoute().equals(packet.getRoute())) {
                if (Boolean.TRUE.equals(session.getAttribute(ACK_FLAG_ATTRIBUTE))) {
                    session.setLongPolling(true);
                    for (Message message : packet.getMessages()) {
                        if (message.isSuccessful()) {
                            message.getExt(true).put("ack", true);
                        }
                    }
                }
            } else if (Meta.CONNECT.getRoute().equals(packet.getRoute()) && Boolean.TRUE.equals(session.getAttribute(ACK_FLAG_ATTRIBUTE)) && packet.getMessages().length > 1) {
                Long l = null;
                Message[] messages = packet.getMessages();
                int length = messages.length;
                int i = 0;
                while (true) {
                    if (i >= length) {
                        break;
                    }
                    Message message2 = messages[i];
                    if (message2.isSuccessful() && Meta.CONNECT.getRoute().getPath().equals(message2.getChannel())) {
                        l = Long.valueOf(((AtomicLong) session.getAttribute(ACK_COUNTER_ATTRIBUTE)).incrementAndGet());
                        message2.getExt(true).put("ack", l.longValue());
                        break;
                    }
                    i++;
                }
                Iterator it = ((ConcurrentLinkedQueue) session.getAttribute(ACK_RESEND_QUEUE_ATTRIBUTE)).iterator();
                if (it.hasNext()) {
                    while (it.hasNext()) {
                        packet = PacketUtility.merge(packet, (Packet) it.next());
                        it.remove();
                    }
                }
                if (l != null) {
                    ConcurrentSkipListMap concurrentSkipListMap = (ConcurrentSkipListMap) session.getAttribute(ACK_UNACKNOWLEDGED_MAP_ATTRIBUTE);
                    AtomicLong atomicLong = (AtomicLong) session.getAttribute(ACK_SIZE_ATTRIBUTE);
                    if (atomicLong.incrementAndGet() > this.maxAckQueueSize) {
                        LoggerManager.getLogger(AckExtension.class).debug("Session(%s) overflowed the ack queue", new Object[]{session.getId()});
                        if (concurrentSkipListMap.pollLastEntry() != null) {
                            atomicLong.decrementAndGet();
                        }
                    }
                    concurrentSkipListMap.put(l, packet);
                }
            }
        }
        return packet;
    }
}
