/*
 * Decompiled with CFR 0.152.
 */
package org.xbib.netty.http.server.reactive;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class HandlerSubscriber<T>
extends ChannelDuplexHandler
implements Subscriber<T> {
    private static final long DEFAULT_LOW_WATERMARK = 4L;
    private static final long DEFAULT_HIGH_WATERMARK = 16L;
    private final EventExecutor executor;
    private final long demandLowWatermark;
    private final long demandHighWatermark;
    private final AtomicBoolean hasSubscription = new AtomicBoolean();
    private volatile Subscription subscription;
    private volatile ChannelHandlerContext ctx;
    private State state = State.NO_SUBSCRIPTION_OR_CONTEXT;
    private long outstandingDemand = 0L;
    private ChannelFuture lastWriteFuture;

    public HandlerSubscriber(EventExecutor executor, long demandLowWatermark, long demandHighWatermark) {
        this.executor = executor;
        this.demandLowWatermark = demandLowWatermark;
        this.demandHighWatermark = demandHighWatermark;
    }

    public HandlerSubscriber(EventExecutor executor) {
        this(executor, 4L, 16L);
    }

    protected void error(Throwable error) {
        this.doClose();
    }

    protected void complete() {
        this.doClose();
    }

    public void handlerAdded(ChannelHandlerContext ctx) {
        this.verifyRegisteredWithRightExecutor(ctx);
        switch (this.state) {
            case NO_SUBSCRIPTION_OR_CONTEXT: {
                this.ctx = ctx;
                this.state = State.NO_SUBSCRIPTION;
                break;
            }
            case NO_CONTEXT: {
                this.ctx = ctx;
                this.maybeStart();
                break;
            }
            case COMPLETE: {
                this.state = State.COMPLETE;
                ctx.close();
                break;
            }
            default: {
                throw new IllegalStateException("This handler must only be added to a pipeline once " + this.state);
            }
        }
    }

    public void channelRegistered(ChannelHandlerContext ctx) {
        this.verifyRegisteredWithRightExecutor(ctx);
        ctx.fireChannelRegistered();
    }

    private void verifyRegisteredWithRightExecutor(ChannelHandlerContext ctx) {
        if (ctx.channel().isRegistered() && !this.executor.inEventLoop()) {
            throw new IllegalArgumentException("Channel handler MUST be registered with the same EventExecutor that it is created with.");
        }
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) {
        this.maybeRequestMore();
        ctx.fireChannelWritabilityChanged();
    }

    public void channelActive(ChannelHandlerContext ctx) {
        if (this.state == State.INACTIVE) {
            this.state = State.RUNNING;
            this.maybeRequestMore();
        }
        ctx.fireChannelActive();
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        this.cancel();
        ctx.fireChannelInactive();
    }

    public void handlerRemoved(ChannelHandlerContext ctx) {
        this.cancel();
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        this.cancel();
        ctx.fireExceptionCaught(cause);
    }

    private void cancel() {
        switch (this.state) {
            case NO_SUBSCRIPTION: {
                this.state = State.CANCELLED;
                break;
            }
            case RUNNING: 
            case INACTIVE: {
                this.subscription.cancel();
                this.state = State.CANCELLED;
                break;
            }
        }
    }

    public void onSubscribe(Subscription subscription) {
        if (subscription == null) {
            throw new NullPointerException("Null subscription");
        }
        if (!this.hasSubscription.compareAndSet(false, true)) {
            subscription.cancel();
        } else {
            this.subscription = subscription;
            this.executor.execute(this::provideSubscription);
        }
    }

    private void provideSubscription() {
        switch (this.state) {
            case NO_SUBSCRIPTION_OR_CONTEXT: {
                this.state = State.NO_CONTEXT;
                break;
            }
            case NO_SUBSCRIPTION: {
                this.maybeStart();
                break;
            }
            case CANCELLED: {
                this.subscription.cancel();
                break;
            }
        }
    }

    private void maybeStart() {
        if (this.ctx.channel().isActive()) {
            this.state = State.RUNNING;
            this.maybeRequestMore();
        } else {
            this.state = State.INACTIVE;
        }
    }

    public void onNext(T t) {
        this.lastWriteFuture = this.ctx.writeAndFlush(t);
        this.lastWriteFuture.addListener((GenericFutureListener)((ChannelFutureListener)future -> {
            --this.outstandingDemand;
            this.maybeRequestMore();
        }));
    }

    public void onError(Throwable error) {
        if (error == null) {
            throw new NullPointerException("Null error published");
        }
        this.error(error);
    }

    public void onComplete() {
        if (this.lastWriteFuture == null) {
            this.complete();
        } else {
            this.lastWriteFuture.addListener((GenericFutureListener)((ChannelFutureListener)channelFuture -> this.complete()));
        }
    }

    private void doClose() {
        this.executor.execute(() -> {
            switch (this.state) {
                case NO_SUBSCRIPTION: 
                case RUNNING: 
                case INACTIVE: {
                    this.ctx.close();
                    this.state = State.COMPLETE;
                    break;
                }
            }
        });
    }

    private void maybeRequestMore() {
        if (this.outstandingDemand <= this.demandLowWatermark && this.ctx.channel().isWritable()) {
            long toRequest = this.demandHighWatermark - this.outstandingDemand;
            this.outstandingDemand = this.demandHighWatermark;
            this.subscription.request(toRequest);
        }
    }

    static enum State {
        NO_SUBSCRIPTION_OR_CONTEXT,
        NO_SUBSCRIPTION,
        NO_CONTEXT,
        INACTIVE,
        RUNNING,
        CANCELLED,
        COMPLETE;

    }
}

