/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.utils;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.Executor;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.TimeSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Processors;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.internal.CancelImmediatelySubscriber;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpContextKeys;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpHeaderNames;
import io.servicetalk.http.api.HttpHeaderValues;
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.StreamingHttpConnectionFilter;
import io.servicetalk.http.api.StreamingHttpConnectionFilterFactory;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.utils.AbstractTimeoutHttpFilter;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.utils.internal.ThrowableUtils;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class JavaNetSoTimeoutHttpConnectionFilter
implements StreamingHttpConnectionFilterFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(JavaNetSoTimeoutHttpConnectionFilter.class);
    private final BiFunction<HttpRequestMetaData, TimeSource, Duration> timeoutForRequest;
    @Nullable
    private final io.servicetalk.concurrent.api.Executor timeoutExecutor;

    public JavaNetSoTimeoutHttpConnectionFilter(Duration duration) {
        this(new AbstractTimeoutHttpFilter.FixedDuration(duration));
    }

    public JavaNetSoTimeoutHttpConnectionFilter(Duration duration, io.servicetalk.concurrent.api.Executor timeoutExecutor) {
        this(new AbstractTimeoutHttpFilter.FixedDuration(duration), timeoutExecutor);
    }

    public JavaNetSoTimeoutHttpConnectionFilter(BiFunction<HttpRequestMetaData, TimeSource, Duration> timeoutForRequest) {
        this.timeoutForRequest = Objects.requireNonNull(timeoutForRequest);
        this.timeoutExecutor = null;
    }

    public JavaNetSoTimeoutHttpConnectionFilter(BiFunction<HttpRequestMetaData, TimeSource, Duration> timeoutForRequest, io.servicetalk.concurrent.api.Executor timeoutExecutor) {
        this.timeoutForRequest = Objects.requireNonNull(timeoutForRequest);
        this.timeoutExecutor = Objects.requireNonNull(timeoutExecutor);
    }

    public StreamingHttpConnectionFilter create(FilterableStreamingHttpConnection connection) {
        return new StreamingHttpConnectionFilter(connection){

            public Single<StreamingHttpResponse> request(StreamingHttpRequest request) {
                return Single.defer(() -> {
                    io.servicetalk.concurrent.api.Executor timeoutExecutor = JavaNetSoTimeoutHttpConnectionFilter.this.contextExecutor((HttpRequestMetaData)request, (ExecutionContext<HttpExecutionStrategy>)((ExecutionContext)this.executionContext()));
                    Duration timeout = (Duration)JavaNetSoTimeoutHttpConnectionFilter.this.timeoutForRequest.apply(request, timeoutExecutor);
                    if (timeout == null || timeout.isZero()) {
                        return this.delegate().request(request).shareContextOnSubscribe();
                    }
                    if (timeout.isNegative()) {
                        return Single.failed((Throwable)new IllegalArgumentException("timeout: " + timeout + " (expected > 0)")).shareContextOnSubscribe();
                    }
                    CompletableSource.Processor requestProcessor = Processors.newCompletableProcessor();
                    boolean expectContinue = request.headers().contains(HttpHeaderNames.EXPECT, HttpHeaderValues.CONTINUE);
                    Cancellable continueTimeout = expectContinue ? timeoutExecutor.schedule(() -> requestProcessor.onError((Throwable)JavaNetSoTimeoutHttpConnectionFilter.newStacklessSocketTimeoutException("Read timed out after " + timeout.toMillis() + "ms waiting for 100 (Continue) response")), timeout) : null;
                    return this.delegate().request(request.transformMessageBody(p -> {
                        Publisher body = p.beforeFinally(() -> ((CompletableSource.Processor)requestProcessor).onComplete());
                        if (continueTimeout != null) {
                            return body.beforeOnSubscribe(__ -> continueTimeout.cancel());
                        }
                        return body;
                    })).liftSync(subscriber -> new RequestTimeoutSubscriber((SingleSource.Subscriber<? super StreamingHttpResponse>)subscriber, SourceAdapters.fromSource((CompletableSource)requestProcessor), timeout, timeoutExecutor)).shareContextOnSubscribe();
                });
            }
        };
    }

    private io.servicetalk.concurrent.api.Executor contextExecutor(HttpRequestMetaData requestMetaData, ExecutionContext<HttpExecutionStrategy> context) {
        if (this.timeoutExecutor != null) {
            return this.timeoutExecutor;
        }
        HttpExecutionStrategy strategy = (HttpExecutionStrategy)requestMetaData.context().getOrDefault(HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY, (Object)context.executionStrategy());
        assert (strategy != null);
        return strategy.isMetadataReceiveOffloaded() || strategy.isDataReceiveOffloaded() ? context.executor() : context.ioExecutor();
    }

    public HttpExecutionStrategy requiredOffloads() {
        return HttpExecutionStrategies.offloadNone();
    }

    private static StacklessSocketTimeoutException newStacklessSocketTimeoutException(String message) {
        return StacklessSocketTimeoutException.newInstance(message, JavaNetSoTimeoutHttpConnectionFilter.class, "request");
    }

    private static final class StacklessSocketTimeoutException
    extends SocketTimeoutException {
        private static final long serialVersionUID = -6407427631101487627L;

        private StacklessSocketTimeoutException(String message) {
            super(message);
        }

        @Override
        public Throwable fillInStackTrace() {
            return this;
        }

        static StacklessSocketTimeoutException newInstance(String message, Class<?> clazz, String method) {
            return (StacklessSocketTimeoutException)io.servicetalk.concurrent.internal.ThrowableUtils.unknownStackTrace((Throwable)new StacklessSocketTimeoutException(message), clazz, (String)method);
        }
    }

    static final class RequestTimeoutSubscriber
    implements SingleSource.Subscriber<StreamingHttpResponse> {
        private static final AtomicIntegerFieldUpdater<RequestTimeoutSubscriber> onceUpdater = AtomicIntegerFieldUpdater.newUpdater(RequestTimeoutSubscriber.class, "once");
        private final DelayedCancellable requestCancellable = new DelayedCancellable();
        private final DelayedCancellable timeoutCancellable = new DelayedCancellable();
        private final SingleSource.Subscriber<? super StreamingHttpResponse> delegate;
        private final Completable requestComplete;
        private final Duration timeout;
        private final io.servicetalk.concurrent.api.Executor timeoutExecutor;
        private volatile int once;

        RequestTimeoutSubscriber(SingleSource.Subscriber<? super StreamingHttpResponse> delegate, Completable requestComplete, Duration timeout, io.servicetalk.concurrent.api.Executor timeoutExecutor) {
            this.delegate = delegate;
            this.requestComplete = requestComplete;
            this.timeout = timeout;
            this.timeoutExecutor = timeoutExecutor;
        }

        public void onSubscribe(Cancellable cancellable) {
            this.delegate.onSubscribe(() -> {
                this.once();
                try {
                    this.timeoutCancellable.cancel();
                }
                finally {
                    this.requestCancellable.cancel();
                }
            });
            this.requestCancellable.delayedCancellable(cancellable);
            this.timeoutCancellable.delayedCancellable(this.requestComplete.concat(Completable.never().timeout(this.timeout, (Executor)this.timeoutExecutor)).beforeOnError(this::handleInterruptions).subscribe());
        }

        private void handleInterruptions(Throwable t) {
            if (this.once()) {
                try {
                    this.requestCancellable.cancel();
                }
                catch (Throwable tt) {
                    ThrowableUtils.addSuppressed((Throwable)t, (Throwable)tt);
                }
                finally {
                    Throwable result = t;
                    if (t instanceof TimeoutException) {
                        result = JavaNetSoTimeoutHttpConnectionFilter.newStacklessSocketTimeoutException("Read timed out after " + this.timeout.toMillis() + "ms waiting for response meta-data").initCause(t);
                    }
                    this.delegate.onError(result);
                }
            }
        }

        public void onSuccess(@Nullable StreamingHttpResponse result) {
            block11: {
                try {
                    if (!this.once()) break block11;
                    try {
                        this.timeoutCancellable.cancel();
                    }
                    catch (Throwable t) {
                        this.delegate.onError(t);
                        if (result != null) {
                            SourceAdapters.toSource((Publisher)result.messageBody()).subscribe((PublisherSource.Subscriber)CancelImmediatelySubscriber.INSTANCE);
                        }
                        return;
                    }
                    if (result != null) {
                        result = result.transformMessageBody(p -> p.timeout(this.timeout, (Executor)this.timeoutExecutor).onErrorMap(TimeoutException.class, t -> JavaNetSoTimeoutHttpConnectionFilter.newStacklessSocketTimeoutException("Read timed out after " + this.timeout.toMillis() + "ms waiting for the next response payload body chunk").initCause((Throwable)t)));
                    }
                    try {
                        this.delegate.onSuccess((Object)result);
                        result = null;
                    }
                    catch (Throwable t) {
                        LOGGER.warn("Exception thrown by onSuccess of Subscriber {}. Draining response.", this.delegate, (Object)t);
                    }
                }
                finally {
                    if (result != null) {
                        SourceAdapters.toSource((Publisher)result.messageBody()).subscribe((PublisherSource.Subscriber)CancelImmediatelySubscriber.INSTANCE);
                    }
                }
            }
        }

        public void onError(Throwable t) {
            if (this.once()) {
                try {
                    this.timeoutCancellable.cancel();
                }
                catch (Throwable tt) {
                    ThrowableUtils.addSuppressed((Throwable)t, (Throwable)tt);
                }
                finally {
                    this.delegate.onError(t);
                }
            }
        }

        private boolean once() {
            return onceUpdater.compareAndSet(this, 0, 1);
        }
    }
}

