package io.micronaut.http.netty.reactive;

import io.micronaut.core.annotation.Internal;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.EventExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

@Internal
/* loaded from: input_file:io/micronaut/http/netty/reactive/HandlerSubscriber.class */
public class HandlerSubscriber<T> extends ChannelDuplexHandler implements Subscriber<T> {
    protected ChannelFuture lastWriteFuture;
    private final EventExecutor executor;
    private volatile Subscription subscription;
    private volatile ChannelHandlerContext ctx;
    private final AtomicBoolean hasSubscription = new AtomicBoolean();
    private State state = State.NO_SUBSCRIPTION_OR_CONTEXT;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/micronaut/http/netty/reactive/HandlerSubscriber$State.class */
    public enum State {
        NO_SUBSCRIPTION_OR_CONTEXT,
        NO_SUBSCRIPTION,
        NO_CONTEXT,
        INACTIVE,
        RUNNING,
        CANCELLED,
        COMPLETE
    }

    public HandlerSubscriber(EventExecutor eventExecutor) {
        this.executor = eventExecutor;
    }

    protected void error(Throwable th) {
        doClose();
    }

    protected void complete() {
        doClose();
    }

    public void handlerAdded(ChannelHandlerContext channelHandlerContext) {
        verifyRegisteredWithRightExecutor(channelHandlerContext);
        switch (this.state) {
            case NO_SUBSCRIPTION_OR_CONTEXT:
                this.ctx = channelHandlerContext;
                this.state = State.NO_SUBSCRIPTION;
                return;
            case NO_CONTEXT:
                this.ctx = channelHandlerContext;
                maybeStart();
                return;
            case COMPLETE:
                this.state = State.COMPLETE;
                channelHandlerContext.close();
                return;
            default:
                throw new IllegalStateException("This handler must only be added to a pipeline once " + this.state);
        }
    }

    public void channelRegistered(ChannelHandlerContext channelHandlerContext) {
        verifyRegisteredWithRightExecutor(channelHandlerContext);
        channelHandlerContext.fireChannelRegistered();
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) {
        maybeRequestMore();
        channelHandlerContext.fireChannelWritabilityChanged();
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) {
        if (this.state == State.INACTIVE) {
            this.state = State.RUNNING;
            maybeRequestMore();
        }
        channelHandlerContext.fireChannelActive();
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        cancel();
        channelHandlerContext.fireChannelInactive();
    }

    public void handlerRemoved(ChannelHandlerContext channelHandlerContext) {
        cancel();
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        cancel();
        channelHandlerContext.fireExceptionCaught(th);
    }

    public void onSubscribe(Subscription subscription) {
        if (subscription == null) {
            throw new NullPointerException("Null subscription");
        }
        if (!this.hasSubscription.compareAndSet(false, true)) {
            subscription.cancel();
        } else {
            this.subscription = subscription;
            this.executor.execute(this::provideSubscription);
        }
    }

    public void onNext(T t) {
        onNext(t, this.ctx.newPromise());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onNext(T t, ChannelPromise channelPromise) {
        this.lastWriteFuture = this.ctx.writeAndFlush(t, channelPromise);
        this.lastWriteFuture.addListener(future -> {
            maybeRequestMore();
        });
    }

    public void onError(Throwable th) {
        if (th == null) {
            throw new NullPointerException("Null error published");
        }
        error(th);
    }

    public void onComplete() {
        if (this.lastWriteFuture == null) {
            complete();
        } else {
            this.lastWriteFuture.addListener(future -> {
                complete();
            });
        }
    }

    private void doClose() {
        this.executor.execute(() -> {
            switch (this.state) {
                case NO_SUBSCRIPTION:
                case RUNNING:
                case INACTIVE:
                    this.ctx.close();
                    this.state = State.COMPLETE;
                    return;
                default:
                    return;
            }
        });
    }

    private void maybeRequestMore() {
        if (!this.ctx.channel().isWritable() || this.state == State.COMPLETE || this.state == State.CANCELLED) {
            return;
        }
        this.subscription.request(1L);
    }

    private void verifyRegisteredWithRightExecutor(ChannelHandlerContext channelHandlerContext) {
        if (channelHandlerContext.channel().isRegistered() && !this.executor.inEventLoop()) {
            throw new IllegalArgumentException("Channel handler MUST be registered with the same EventExecutor that it is created with.");
        }
    }

    private void cancel() {
        switch (this.state) {
            case NO_SUBSCRIPTION:
                this.state = State.CANCELLED;
                return;
            case RUNNING:
            case INACTIVE:
                this.subscription.cancel();
                this.state = State.CANCELLED;
                return;
            default:
                return;
        }
    }

    private void provideSubscription() {
        switch (this.state) {
            case NO_SUBSCRIPTION_OR_CONTEXT:
                this.state = State.NO_CONTEXT;
                return;
            case NO_SUBSCRIPTION:
                maybeStart();
                return;
            case CANCELLED:
                this.subscription.cancel();
                return;
            default:
                return;
        }
    }

    private void maybeStart() {
        if (!this.ctx.channel().isActive()) {
            this.state = State.INACTIVE;
        } else {
            this.state = State.RUNNING;
            maybeRequestMore();
        }
    }
}
