package io.opentelemetry.testing.internal.armeria.internal.server;

import io.opentelemetry.testing.internal.armeria.common.HttpData;
import io.opentelemetry.testing.internal.armeria.common.HttpHeaders;
import io.opentelemetry.testing.internal.armeria.common.HttpResponse;
import io.opentelemetry.testing.internal.armeria.common.HttpResponseWriter;
import io.opentelemetry.testing.internal.armeria.common.ResponseHeaders;
import io.opentelemetry.testing.internal.armeria.common.annotation.Nullable;
import io.opentelemetry.testing.internal.armeria.internal.common.util.ObjectCollectingUtil;
import io.opentelemetry.testing.internal.armeria.server.ServiceRequestContext;
import io.opentelemetry.testing.internal.io.netty.handler.codec.http.HttpHeaders;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/opentelemetry/testing/internal/armeria/internal/server/ResponseConversionUtil.class */
public final class ResponseConversionUtil {

    /* loaded from: input_file:io/opentelemetry/testing/internal/armeria/internal/server/ResponseConversionUtil$StreamingSubscriber.class */
    private static final class StreamingSubscriber<T> implements Subscriber<T> {
        private final HttpResponseWriter writer;
        private final ResponseHeaders headers;
        private final HttpHeaders trailers;
        private final Function<T, HttpData> contentConverter;

        @Nullable
        private Subscription subscription;
        private boolean headersSent;
        static final /* synthetic */ boolean $assertionsDisabled;

        StreamingSubscriber(HttpResponseWriter httpResponseWriter, ResponseHeaders responseHeaders, HttpHeaders httpHeaders, Function<T, HttpData> function) {
            this.writer = (HttpResponseWriter) Objects.requireNonNull(httpResponseWriter, "writer");
            this.headers = (ResponseHeaders) Objects.requireNonNull(responseHeaders, "headers");
            this.trailers = (HttpHeaders) Objects.requireNonNull(httpHeaders, HttpHeaders.Values.TRAILERS);
            this.contentConverter = (Function) Objects.requireNonNull(function, "contentConverter");
        }

        @Override // org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            if (!$assertionsDisabled && this.subscription != null) {
                throw new AssertionError();
            }
            this.subscription = subscription;
            this.writer.whenComplete().handle((r3, th) -> {
                if (th == null) {
                    return null;
                }
                subscription.cancel();
                return null;
            });
            subscription.request(1L);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(T t) {
            if (this.writer.isOpen()) {
                try {
                    HttpData apply = this.contentConverter.apply(t);
                    if (!this.headersSent) {
                        this.writer.write((HttpResponseWriter) this.headers);
                        this.headersSent = true;
                    }
                    this.writer.write((HttpResponseWriter) apply);
                    this.writer.whenConsumed().thenRun(() -> {
                        if (!$assertionsDisabled && this.subscription == null) {
                            throw new AssertionError();
                        }
                        this.subscription.request(1L);
                    });
                } catch (Exception e) {
                    try {
                        this.writer.close(e);
                        if (!$assertionsDisabled && this.subscription == null) {
                            throw new AssertionError();
                        }
                        this.subscription.cancel();
                    } catch (Throwable th) {
                        if (!$assertionsDisabled && this.subscription == null) {
                            throw new AssertionError();
                        }
                        this.subscription.cancel();
                        throw th;
                    }
                }
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            if (this.writer.isOpen()) {
                this.writer.close(th);
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            if (this.writer.isOpen()) {
                if (this.trailers.isEmpty() || this.writer.tryWrite((HttpResponseWriter) this.trailers)) {
                    this.writer.close();
                }
            }
        }

        static {
            $assertionsDisabled = !ResponseConversionUtil.class.desiredAssertionStatus();
        }
    }

    public static HttpResponseWriter aggregateFrom(Stream<?> stream, ResponseHeaders responseHeaders, io.opentelemetry.testing.internal.armeria.common.HttpHeaders httpHeaders, Function<Object, HttpData> function, Executor executor) {
        Objects.requireNonNull(stream, "stream");
        Objects.requireNonNull(responseHeaders, "headers");
        Objects.requireNonNull(httpHeaders, HttpHeaders.Values.TRAILERS);
        Objects.requireNonNull(function, "contentConverter");
        Objects.requireNonNull(executor, "executor");
        return aggregateFrom(ObjectCollectingUtil.collectFrom(stream, executor), responseHeaders, httpHeaders, function);
    }

    public static HttpResponseWriter aggregateFrom(Publisher<?> publisher, ResponseHeaders responseHeaders, io.opentelemetry.testing.internal.armeria.common.HttpHeaders httpHeaders, Function<Object, HttpData> function, ServiceRequestContext serviceRequestContext) {
        Objects.requireNonNull(publisher, "publisher");
        Objects.requireNonNull(responseHeaders, "headers");
        Objects.requireNonNull(httpHeaders, HttpHeaders.Values.TRAILERS);
        Objects.requireNonNull(function, "contentConverter");
        return aggregateFrom(ObjectCollectingUtil.collectFrom(publisher, serviceRequestContext), responseHeaders, httpHeaders, function);
    }

    private static HttpResponseWriter aggregateFrom(CompletableFuture<?> completableFuture, ResponseHeaders responseHeaders, io.opentelemetry.testing.internal.armeria.common.HttpHeaders httpHeaders, Function<Object, HttpData> function) {
        HttpResponseWriter streaming = HttpResponse.streaming();
        completableFuture.handle((obj, th) -> {
            if (th != null) {
                streaming.close(th);
                return null;
            }
            try {
                HttpData httpData = (HttpData) function.apply(obj);
                streaming.write((HttpResponseWriter) responseHeaders);
                streaming.write((HttpResponseWriter) httpData);
                if (!httpHeaders.isEmpty()) {
                    streaming.write((HttpResponseWriter) httpHeaders);
                }
                streaming.close();
                return null;
            } catch (Exception e) {
                streaming.close(e);
                return null;
            }
        });
        return streaming;
    }

    public static <T> HttpResponseWriter streamingFrom(Stream<T> stream, ResponseHeaders responseHeaders, io.opentelemetry.testing.internal.armeria.common.HttpHeaders httpHeaders, Function<T, HttpData> function, Executor executor) {
        Objects.requireNonNull(stream, "stream");
        Objects.requireNonNull(responseHeaders, "headers");
        Objects.requireNonNull(httpHeaders, HttpHeaders.Values.TRAILERS);
        Objects.requireNonNull(function, "contentConverter");
        Objects.requireNonNull(executor, "executor");
        HttpResponseWriter streaming = HttpResponse.streaming();
        executor.execute(() -> {
            try {
                Iterator it = ((Stream) stream.sequential()).iterator();
                boolean z = false;
                while (it.hasNext()) {
                    HttpData httpData = (HttpData) function.apply(it.next());
                    if (!z) {
                        streaming.write((HttpResponseWriter) responseHeaders);
                        z = true;
                    }
                    streaming.write((HttpResponseWriter) httpData);
                }
                if (!httpHeaders.isEmpty()) {
                    streaming.write((HttpResponseWriter) httpHeaders);
                }
                streaming.close();
            } catch (Exception e) {
                streaming.close(e);
            }
        });
        return streaming;
    }

    public static <T> HttpResponseWriter streamingFrom(Publisher<T> publisher, ResponseHeaders responseHeaders, io.opentelemetry.testing.internal.armeria.common.HttpHeaders httpHeaders, Function<T, HttpData> function) {
        HttpResponseWriter streaming = HttpResponse.streaming();
        publisher.subscribe(new StreamingSubscriber(streaming, responseHeaders, httpHeaders, function));
        return streaming;
    }

    private ResponseConversionUtil() {
    }
}
