/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.client.internal.mqtt.handler.publish.outgoing;

import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttIncomingAckFlow;
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishWithFlow;
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
import com.hivemq.client.internal.rx.FuseableSubscriber;
import com.hivemq.shaded.org.jetbrains.annotations.NotNull;
import com.hivemq.shaded.org.jetbrains.annotations.Nullable;
import io.reactivex.Flowable;
import io.reactivex.FlowableSubscriber;
import io.reactivex.internal.fuseable.ConditionalSubscriber;
import io.reactivex.plugins.RxJavaPlugins;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscriber;

public class MqttPublishFlowableAckLink
extends Flowable<MqttPublishWithFlow> {
    @NotNull
    private final Flowable<MqttPublish> source;
    @NotNull
    private final MqttIncomingAckFlow ackFlow;

    MqttPublishFlowableAckLink(@NotNull Flowable<MqttPublish> source, @NotNull MqttIncomingAckFlow ackFlow) {
        this.source = source;
        this.ackFlow = ackFlow;
    }

    protected void subscribeActual(@NotNull Subscriber<? super MqttPublishWithFlow> s) {
        AbstractAckLinkSubscriber ackLinkSubscriber;
        if (s instanceof ConditionalSubscriber) {
            ConditionalSubscriber cs = (ConditionalSubscriber)s;
            ackLinkSubscriber = new AckLinkConditionalSubscriber((ConditionalSubscriber<? super MqttPublishWithFlow>)cs, this.ackFlow);
        } else {
            ackLinkSubscriber = new AckLinkSubscriber(s, this.ackFlow);
        }
        this.source.subscribe((FlowableSubscriber)ackLinkSubscriber);
    }

    private static class AckLinkConditionalSubscriber
    extends AbstractAckLinkSubscriber<ConditionalSubscriber<? super MqttPublishWithFlow>>
    implements ConditionalSubscriber<MqttPublish> {
        AckLinkConditionalSubscriber(@NotNull ConditionalSubscriber<? super MqttPublishWithFlow> subscriber, @NotNull MqttIncomingAckFlow ackFlow) {
            super(subscriber, ackFlow);
        }

        public void onNext(@Nullable MqttPublish publish) {
            assert (this.subscription != null);
            if (!this.tryOnNext(publish)) {
                this.subscription.request(1L);
            }
        }

        public boolean tryOnNext(@Nullable MqttPublish publish) {
            if (this.startEmitting()) {
                boolean consumed;
                if (this.sourceMode == 0) {
                    assert (publish != null);
                    consumed = ((ConditionalSubscriber)this.subscriber).tryOnNext((Object)new MqttPublishWithFlow(publish, this.ackFlow));
                    if (consumed) {
                        ++this.published;
                    }
                } else {
                    consumed = ((ConditionalSubscriber)this.subscriber).tryOnNext(null);
                }
                this.stopEmitting();
                return consumed;
            }
            return true;
        }
    }

    private static class AckLinkSubscriber
    extends AbstractAckLinkSubscriber<Subscriber<? super MqttPublishWithFlow>> {
        AckLinkSubscriber(@NotNull Subscriber<? super MqttPublishWithFlow> subscriber, @NotNull MqttIncomingAckFlow ackFlow) {
            super(subscriber, ackFlow);
        }

        public void onNext(@Nullable MqttPublish publish) {
            if (this.startEmitting()) {
                if (this.sourceMode == 0) {
                    assert (publish != null);
                    this.subscriber.onNext((Object)new MqttPublishWithFlow(publish, this.ackFlow));
                    ++this.published;
                } else {
                    this.subscriber.onNext(null);
                }
                this.stopEmitting();
            }
        }
    }

    static abstract class AbstractAckLinkSubscriber<S extends Subscriber<? super MqttPublishWithFlow>>
    extends FuseableSubscriber<MqttPublish, MqttPublishWithFlow, S>
    implements LinkCancellable {
        static final int STATE_NONE = 0;
        static final int STATE_EMITTING = 1;
        static final int STATE_DONE = 2;
        static final int STATE_CANCEL = 3;
        static final int STATE_CANCELLED = 4;
        @NotNull
        final MqttIncomingAckFlow ackFlow;
        private boolean linked;
        @NotNull
        private final AtomicInteger state = new AtomicInteger();
        @NotNull
        private final AtomicInteger pollState = new AtomicInteger();
        long published;
        @Nullable
        private Throwable error;

        AbstractAckLinkSubscriber(@NotNull S subscriber, @NotNull MqttIncomingAckFlow ackFlow) {
            super(subscriber);
            this.ackFlow = ackFlow;
        }

        boolean startEmitting() {
            return this.startEmitting(this.state);
        }

        private boolean startEmitting(@NotNull AtomicInteger state) {
            return state.compareAndSet(0, 1);
        }

        void stopEmitting() {
            this.stopEmitting(this.state);
        }

        private void stopEmitting(@NotNull AtomicInteger state) {
            assert (this.subscription != null);
            if (!state.compareAndSet(1, 0)) {
                this.cancelActual();
            }
        }

        public void onComplete() {
            if (this.state.compareAndSet(0, 2)) {
                this.subscriber.onComplete();
                if (this.sourceMode == 0) {
                    this.ackFlow.onComplete(this.published);
                }
            }
        }

        public void onError(@NotNull Throwable t) {
            this.error = t;
            if (this.state.compareAndSet(0, 2)) {
                this.subscriber.onComplete();
                if (this.sourceMode == 0) {
                    this.ackFlow.onError(t, this.published);
                }
            } else {
                RxJavaPlugins.onError((Throwable)t);
            }
        }

        @Override
        public void request(long n) {
            this.link();
            super.request(n);
        }

        @Override
        public int requestFusion(int mode) {
            if (this.queueSubscription != null) {
                this.sourceMode = this.queueSubscription.requestFusion(mode);
            }
            this.link();
            return this.sourceMode;
        }

        @Nullable
        public MqttPublishWithFlow poll() {
            MqttPublish publish;
            assert (this.queueSubscription != null);
            if (!this.startEmitting(this.pollState)) {
                return null;
            }
            try {
                publish = (MqttPublish)this.queueSubscription.poll();
            }
            catch (Throwable e) {
                this.queueSubscription.cancel();
                this.pollState.set(2);
                if (this.state.getAndSet(2) != 2) {
                    this.ackFlow.onError(e, this.published);
                    if (this.sourceMode == 2) {
                        this.subscriber.onComplete();
                    }
                }
                return null;
            }
            if (publish == null) {
                if (this.sourceMode == 1) {
                    this.pollState.set(2);
                    if (this.state.getAndSet(2) != 2) {
                        this.ackFlow.onComplete(this.published);
                    }
                } else {
                    if (this.state.get() == 2) {
                        if (this.error == null) {
                            this.ackFlow.onComplete(this.published);
                        } else {
                            this.ackFlow.onError(this.error);
                        }
                    }
                    this.stopEmitting(this.pollState);
                }
                return null;
            }
            this.stopEmitting(this.pollState);
            ++this.published;
            return new MqttPublishWithFlow(publish, this.ackFlow);
        }

        private void link() {
            if (!this.linked) {
                this.linked = true;
                this.ackFlow.link(this);
            }
        }

        @Override
        public void cancelLink() {
            int previousState = this.state.getAndSet(3);
            if (previousState == 0 && this.pollState.getAndSet(3) == 0) {
                this.cancelActual();
            } else if (previousState == 2 && this.state.compareAndSet(3, 4)) {
                this.ackFlow.onLinkCancelled();
            }
        }

        private void cancelActual() {
            if (this.state.compareAndSet(3, 4)) {
                assert (this.subscription != null);
                this.subscription.cancel();
                if (this.sourceMode != 1) {
                    this.subscriber.onComplete();
                }
                this.ackFlow.onLinkCancelled();
            }
        }
    }

    static interface LinkCancellable {
        @NotNull
        public static final LinkCancellable CANCELLED = () -> {};

        public void cancelLink();
    }
}

