/*
 * Decompiled with CFR 0.152.
 */
package io.opentelemetry.testing.internal.armeria.internal.common.stream;

import io.opentelemetry.testing.internal.armeria.common.HttpHeaders;
import io.opentelemetry.testing.internal.armeria.common.HttpMessage;
import io.opentelemetry.testing.internal.armeria.common.HttpObject;
import io.opentelemetry.testing.internal.armeria.common.HttpRequest;
import io.opentelemetry.testing.internal.armeria.common.RequestHeaders;
import io.opentelemetry.testing.internal.armeria.common.ResponseHeaders;
import io.opentelemetry.testing.internal.armeria.common.annotation.Nullable;
import io.opentelemetry.testing.internal.armeria.common.annotation.UnstableApi;
import io.opentelemetry.testing.internal.armeria.common.stream.AbortedStreamException;
import io.opentelemetry.testing.internal.armeria.common.stream.CancelledSubscriptionException;
import io.opentelemetry.testing.internal.armeria.common.stream.DefaultStreamMessage;
import io.opentelemetry.testing.internal.armeria.common.stream.HttpDecoder;
import io.opentelemetry.testing.internal.armeria.common.stream.StreamDecoder;
import io.opentelemetry.testing.internal.armeria.common.stream.StreamDecoderOutput;
import io.opentelemetry.testing.internal.armeria.common.stream.StreamMessage;
import io.opentelemetry.testing.internal.armeria.common.stream.SubscriptionOption;
import io.opentelemetry.testing.internal.armeria.common.util.Exceptions;
import io.opentelemetry.testing.internal.armeria.internal.common.stream.ByteBufsDecoderInput;
import io.opentelemetry.testing.internal.io.netty.buffer.ByteBuf;
import io.opentelemetry.testing.internal.io.netty.buffer.ByteBufAllocator;
import io.opentelemetry.testing.internal.io.netty.util.concurrent.EventExecutor;
import java.util.Objects;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

@UnstableApi
public final class DecodedStreamMessage<I, O>
extends DefaultStreamMessage<O>
implements StreamDecoderOutput<O> {
    private final DecodingSubscriber subscriber = new DecodingSubscriber();
    private final StreamDecoder<I, O> decoder;
    private final boolean isHttpDecoder;
    private final ByteBufsDecoderInput input;
    private final StreamMessage<? extends I> publisher;
    @Nullable
    private RequestHeaders requestHeaders;
    @Nullable
    private Subscription upstream;
    private boolean handlerProduced;
    private boolean sawLeadingHeaders;
    private boolean initialized;
    private boolean askedUpstreamForElement;
    private boolean cancelled;

    public static <O> StreamMessage<O> of(HttpMessage httpMessage, HttpDecoder<O> decoder, ByteBufAllocator alloc) {
        HttpDecoder<O> cast = decoder;
        return new DecodedStreamMessage<HttpObject, O>(httpMessage, cast, alloc);
    }

    public DecodedStreamMessage(StreamMessage<? extends I> streamMessage, StreamDecoder<I, O> decoder, ByteBufAllocator alloc) {
        this.publisher = Objects.requireNonNull(streamMessage, "streamMessage");
        this.decoder = Objects.requireNonNull(decoder, "decoder");
        this.isHttpDecoder = decoder instanceof HttpDecoder;
        this.input = new ByteBufsDecoderInput(Objects.requireNonNull(alloc, "alloc"));
        if (this.publisher instanceof HttpRequest) {
            this.requestHeaders = ((HttpRequest)this.publisher).headers();
        }
        this.whenComplete().handle((unused1, cause) -> {
            if (cause instanceof CancelledSubscriptionException) {
                this.cancelAndCleanup();
            } else {
                this.cleanup();
            }
            return null;
        });
    }

    @Override
    public void add(O out) {
        if (this.tryWrite(out)) {
            this.handlerProduced = true;
        } else {
            this.cancelAndCleanup();
        }
    }

    @Override
    protected void subscribe0(EventExecutor executor, SubscriptionOption[] options) {
        this.publisher.subscribe(this.subscriber, executor, options);
    }

    private void initialize() {
        if (this.initialized) {
            return;
        }
        this.initialized = true;
        if (this.cancelled) {
            this.upstream.cancel();
            return;
        }
        long demand = this.demand();
        if (demand > 0L && this.requestHeaders != null) {
            RequestHeaders requestHeaders = this.requestHeaders;
            this.requestHeaders = null;
            this.subscriber.onNext(requestHeaders);
            --demand;
        }
        if (demand > 0L) {
            this.askUpstreamForElement();
        }
    }

    @Override
    protected void onRequest(long n) {
        if (this.initialized && n > 0L) {
            if (this.requestHeaders != null) {
                RequestHeaders requestHeaders = this.requestHeaders;
                this.requestHeaders = null;
                this.subscriber.onNext(requestHeaders);
            } else {
                this.whenConsumed().thenRun(() -> {
                    if (this.demand() > 0L) {
                        this.askUpstreamForElement();
                    }
                });
            }
        }
    }

    public void askUpstreamForElement() {
        if (!this.askedUpstreamForElement) {
            this.askedUpstreamForElement = true;
            assert (this.upstream != null);
            this.upstream.request(1L);
        }
    }

    private void cancelAndCleanup() {
        if (this.cancelled) {
            return;
        }
        this.cancelled = true;
        if (this.upstream != null) {
            this.upstream.cancel();
        }
        this.cleanup();
    }

    private void cleanup() {
        this.input.close();
    }

    private final class DecodingSubscriber
    implements Subscriber<I> {
        private DecodingSubscriber() {
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            Objects.requireNonNull(subscription, "subscription");
            if (DecodedStreamMessage.this.upstream == null) {
                DecodedStreamMessage.this.upstream = subscription;
                DecodedStreamMessage.this.initialize();
            } else {
                subscription.cancel();
            }
        }

        @Override
        public void onNext(I obj) {
            Objects.requireNonNull(obj, "obj");
            DecodedStreamMessage.this.askedUpstreamForElement = false;
            DecodedStreamMessage.this.handlerProduced = false;
            try {
                if (DecodedStreamMessage.this.isHttpDecoder && obj instanceof HttpHeaders) {
                    HttpDecoder httpDecoder = (HttpDecoder)DecodedStreamMessage.this.decoder;
                    HttpHeaders headers = (HttpHeaders)obj;
                    if (headers instanceof ResponseHeaders && ((ResponseHeaders)headers).status().isInformational()) {
                        httpDecoder.processInformationalHeaders((ResponseHeaders)headers, DecodedStreamMessage.this);
                    } else if (!DecodedStreamMessage.this.sawLeadingHeaders) {
                        DecodedStreamMessage.this.sawLeadingHeaders = true;
                        httpDecoder.processHeaders((HttpHeaders)obj, DecodedStreamMessage.this);
                    } else {
                        httpDecoder.processTrailers((HttpHeaders)obj, DecodedStreamMessage.this);
                    }
                } else {
                    ByteBuf byteBuf = DecodedStreamMessage.this.decoder.toByteBuf(obj);
                    Objects.requireNonNull(byteBuf, "decoder.toByteBuf() returned null");
                    if (DecodedStreamMessage.this.input.add(byteBuf)) {
                        DecodedStreamMessage.this.decoder.process(DecodedStreamMessage.this.input, DecodedStreamMessage.this);
                    }
                }
                if (DecodedStreamMessage.this.handlerProduced) {
                    if (!DecodedStreamMessage.this.askedUpstreamForElement) {
                        DecodedStreamMessage.this.whenConsumed().handle((unused1, unused2) -> {
                            if (DecodedStreamMessage.this.demand() > 0L) {
                                DecodedStreamMessage.this.askUpstreamForElement();
                            }
                            return null;
                        });
                    }
                } else if (DecodedStreamMessage.this.demand() > 0L) {
                    DecodedStreamMessage.this.askUpstreamForElement();
                }
            }
            catch (Throwable ex) {
                DecodedStreamMessage.this.decoder.processOnError(ex);
                DecodedStreamMessage.this.cancelAndCleanup();
                DecodedStreamMessage.this.abort(ex);
                Exceptions.throwIfFatal(ex);
            }
        }

        @Override
        public void onError(Throwable cause) {
            Objects.requireNonNull(cause, "cause");
            if (DecodedStreamMessage.this.cancelled) {
                return;
            }
            if (!(cause instanceof AbortedStreamException)) {
                DecodedStreamMessage.this.decoder.processOnError(cause);
            }
            DecodedStreamMessage.this.abort(cause);
            DecodedStreamMessage.this.cleanup();
        }

        @Override
        public void onComplete() {
            if (DecodedStreamMessage.this.cancelled) {
                return;
            }
            try {
                DecodedStreamMessage.this.decoder.processOnComplete(DecodedStreamMessage.this.input, DecodedStreamMessage.this);
                DecodedStreamMessage.this.close();
            }
            catch (Exception e) {
                DecodedStreamMessage.this.abort(e);
            }
            finally {
                DecodedStreamMessage.this.cleanup();
            }
        }
    }
}

