/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.client.internal.mqtt.handler.publish.incoming;

import com.hivemq.client.internal.annotations.CallByThread;
import com.hivemq.client.internal.logging.InternalLogger;
import com.hivemq.client.internal.logging.InternalLoggerFactory;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingPublishFlow;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingQosHandler;
import com.hivemq.client.internal.mqtt.ioc.ClientScope;
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulPublish;
import com.hivemq.client.internal.util.collections.ChunkedArrayQueue;
import com.hivemq.client.internal.util.collections.HandleList;
import com.hivemq.shaded.org.jetbrains.annotations.NotNull;
import java.util.Iterator;

@ClientScope
class MqttIncomingPublishService {
    @NotNull
    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(MqttIncomingPublishService.class);
    private static final boolean QOS_0_DROP_LATEST = true;
    @NotNull
    private final MqttIncomingQosHandler incomingQosHandler;
    @NotNull
    private final ChunkedArrayQueue<QueueEntry> qos0Queue = new ChunkedArrayQueue(32);
    @NotNull
    private final ChunkedArrayQueue<QueueEntry> qos1Or2Queue = new ChunkedArrayQueue(32);
    private int referencedFlowCount;
    private int runIndex;
    private int blockingFlowCount;

    MqttIncomingPublishService(@NotNull MqttIncomingQosHandler incomingQosHandler) {
        this.incomingQosHandler = incomingQosHandler;
    }

    @CallByThread(value="Netty EventLoop")
    void onPublishQos0(@NotNull MqttStatefulPublish publish, int receiveMaximum) {
        HandleList<MqttIncomingPublishFlow> flows;
        if (this.qos0Queue.size() >= receiveMaximum) {
            LOGGER.warn("QoS 0 publish message dropped.");
            this.qos0Queue.poll();
        }
        if (!(flows = this.onPublish(publish)).isEmpty()) {
            this.qos0Queue.offer(new QueueEntry(publish, flows));
        }
    }

    @CallByThread(value="Netty EventLoop")
    boolean onPublishQos1Or2(@NotNull MqttStatefulPublish publish, int receiveMaximum) {
        if (this.qos1Or2Queue.size() >= receiveMaximum) {
            return false;
        }
        HandleList<MqttIncomingPublishFlow> flows = this.onPublish(publish);
        if (this.qos1Or2Queue.isEmpty() && flows.isEmpty()) {
            this.incomingQosHandler.ack(publish);
        } else {
            this.qos1Or2Queue.offer(new QueueEntry(publish, flows));
        }
        return true;
    }

    @CallByThread(value="Netty EventLoop")
    @NotNull
    private HandleList<MqttIncomingPublishFlow> onPublish(@NotNull MqttStatefulPublish publish) {
        HandleList<MqttIncomingPublishFlow> flows = this.incomingQosHandler.getIncomingPublishFlows().findMatching(publish);
        if (flows.isEmpty()) {
            LOGGER.warn("No publish flow registered for {}.", publish);
        }
        this.drain();
        for (MqttIncomingPublishFlow flow : flows) {
            if (flow.reference() != 1) continue;
            ++this.referencedFlowCount;
        }
        this.emit((MqttPublish)publish.stateless(), flows);
        return flows;
    }

    @CallByThread(value="Netty EventLoop")
    void drain() {
        ++this.runIndex;
        this.blockingFlowCount = 0;
        boolean acknowledge = true;
        Iterator<QueueEntry> queueIt = this.qos1Or2Queue.iterator();
        while (queueIt.hasNext()) {
            QueueEntry entry = queueIt.next();
            MqttStatefulPublish publish = entry.publish;
            HandleList<MqttIncomingPublishFlow> flows = entry.flows;
            this.emit((MqttPublish)publish.stateless(), flows);
            if (acknowledge && flows.isEmpty()) {
                queueIt.remove();
                this.incomingQosHandler.ack(publish);
                continue;
            }
            acknowledge = false;
            if (this.blockingFlowCount != this.referencedFlowCount) continue;
            return;
        }
        Iterator<QueueEntry> queueIt2 = this.qos0Queue.iterator();
        while (queueIt2.hasNext()) {
            QueueEntry entry = queueIt2.next();
            MqttStatefulPublish publish = entry.publish;
            HandleList<MqttIncomingPublishFlow> flows = entry.flows;
            this.emit((MqttPublish)publish.stateless(), flows);
            if (flows.isEmpty()) {
                queueIt2.remove();
                continue;
            }
            if (this.blockingFlowCount != this.referencedFlowCount) continue;
            return;
        }
    }

    @CallByThread(value="Netty EventLoop")
    private void emit(@NotNull MqttPublish publish, @NotNull HandleList<MqttIncomingPublishFlow> flows) {
        Iterator<MqttIncomingPublishFlow> flowIt = flows.iterator();
        while (flowIt.hasNext()) {
            MqttIncomingPublishFlow flow = flowIt.next();
            if (flow.isCancelled()) {
                flowIt.remove();
                if (flow.dereference() != 0) continue;
                --this.referencedFlowCount;
                continue;
            }
            long requested = flow.requested(this.runIndex);
            if (requested > 0L) {
                flow.onNext(publish);
                flowIt.remove();
                if (flow.dereference() != 0) continue;
                --this.referencedFlowCount;
                flow.checkDone();
                continue;
            }
            if (requested != 0L) continue;
            ++this.blockingFlowCount;
            if (this.blockingFlowCount != this.referencedFlowCount) continue;
            break;
        }
    }

    private static class QueueEntry {
        @NotNull
        final MqttStatefulPublish publish;
        @NotNull
        final HandleList<MqttIncomingPublishFlow> flows;

        QueueEntry(@NotNull MqttStatefulPublish publish, @NotNull HandleList<MqttIncomingPublishFlow> flows) {
            this.publish = publish;
            this.flows = flows;
        }
    }
}

