/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.client.internal.mqtt.handler.publish.incoming;

import com.hivemq.client.internal.annotations.CallByThread;
import com.hivemq.client.internal.mqtt.MqttClientConfig;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingPublishService;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingQosHandler;
import com.hivemq.client.internal.mqtt.handler.util.FlowWithEventLoop;
import com.hivemq.client.internal.shaded.org.jetbrains.annotations.NotNull;
import com.hivemq.client.internal.shaded.org.jetbrains.annotations.Nullable;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import io.reactivex.Emitter;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.plugins.RxJavaPlugins;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

abstract class MqttIncomingPublishFlow
extends FlowWithEventLoop
implements Emitter<Mqtt5Publish>,
Subscription,
Runnable {
    private static final int STATE_NO_NEW_REQUESTS = 0;
    private static final int STATE_NEW_REQUESTS = 1;
    private static final int STATE_BLOCKED = 2;
    @NotNull
    final Subscriber<? super Mqtt5Publish> subscriber;
    @NotNull
    final MqttIncomingPublishService incomingPublishService;
    final boolean manualAcknowledgement;
    private long requested;
    @NotNull
    private final AtomicLong newRequested = new AtomicLong();
    @NotNull
    private final AtomicInteger requestState = new AtomicInteger(0);
    private boolean done;
    @Nullable
    private Throwable error;
    private int referenced;
    private int missingAcknowledgements;
    private long blockedIndex;
    private boolean blocking;

    MqttIncomingPublishFlow(@NotNull Subscriber<? super Mqtt5Publish> subscriber, @NotNull MqttClientConfig clientConfig, @NotNull MqttIncomingQosHandler incomingQosHandler, boolean manualAcknowledgement) {
        super(clientConfig);
        this.subscriber = subscriber;
        this.incomingPublishService = incomingQosHandler.incomingPublishService;
        this.manualAcknowledgement = manualAcknowledgement;
    }

    @CallByThread(value="Netty EventLoop")
    public void onNext(@NotNull Mqtt5Publish result) {
        this.subscriber.onNext((Object)result);
        if (this.requested != Long.MAX_VALUE) {
            --this.requested;
        }
    }

    @CallByThread(value="Netty EventLoop")
    public void onComplete() {
        if (this.done) {
            return;
        }
        this.done = true;
        if (this.setDone()) {
            this.subscriber.onComplete();
        } else {
            this.incomingPublishService.drain();
        }
    }

    @CallByThread(value="Netty EventLoop")
    public void onError(@NotNull Throwable error) {
        if (this.done) {
            if (error != this.error) {
                RxJavaPlugins.onError((Throwable)error);
            }
            return;
        }
        this.error = error;
        this.done = true;
        if (this.setDone()) {
            this.subscriber.onError(error);
        } else {
            this.incomingPublishService.drain();
        }
    }

    @Override
    protected boolean setDone() {
        return this.referenced == 0 && this.missingAcknowledgements == 0 && super.setDone();
    }

    @CallByThread(value="Netty EventLoop")
    void checkDone() {
        if (this.done && this.setDone()) {
            if (this.error != null) {
                this.subscriber.onError(this.error);
            } else {
                this.subscriber.onComplete();
            }
        }
    }

    public void request(long n) {
        if (n > 0L && !this.isCancelled()) {
            BackpressureHelper.add((AtomicLong)this.newRequested, (long)n);
            if (this.requestState.getAndSet(1) == 2) {
                this.eventLoop.execute(this);
            }
        }
    }

    @Override
    @CallByThread(value="Netty EventLoop")
    public void run() {
        if (this.referenced > 0) {
            this.incomingPublishService.drain();
        }
    }

    @CallByThread(value="Netty EventLoop")
    long requested(long runIndex) {
        if (this.requested <= 0L) {
            long newRequested;
            if (this.blocking && this.blockedIndex != runIndex) {
                this.blocking = false;
            }
            if (this.blocking) {
                return -1L;
            }
            do {
                if (this.requestState.compareAndSet(0, 2)) {
                    this.blockedIndex = runIndex;
                    this.blocking = true;
                    return 0L;
                }
                this.requestState.set(0);
            } while ((newRequested = this.newRequested.getAndSet(0L)) <= 0L);
            this.requested = BackpressureHelper.addCap((long)this.requested, (long)newRequested);
            return this.requested;
        }
        return this.requested;
    }

    @Override
    protected void onCancel() {
        this.eventLoop.execute(this::runCancel);
    }

    @CallByThread(value="Netty EventLoop")
    void runCancel() {
        if (this.referenced > 0) {
            this.incomingPublishService.drain();
        }
    }

    @CallByThread(value="Netty EventLoop")
    void increaseMissingAcknowledgements() {
        ++this.missingAcknowledgements;
    }

    @CallByThread(value="Netty EventLoop")
    void acknowledge(boolean drain) {
        if (drain) {
            this.incomingPublishService.drain();
        }
        if (--this.missingAcknowledgements == 0) {
            this.checkDone();
        }
    }

    @CallByThread(value="Netty EventLoop")
    int reference() {
        return ++this.referenced;
    }

    @CallByThread(value="Netty EventLoop")
    int dereference() {
        return --this.referenced;
    }
}

