package io.servicetalk.http.netty;

import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.ConnectionFactoryFilter;
import io.servicetalk.client.api.ConsumableEvent;
import io.servicetalk.client.api.ReservableRequestConcurrencyController;
import io.servicetalk.concurrent.api.AsyncCloseables;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.FilterableStreamingHttpLoadBalancedConnection;
import io.servicetalk.http.api.HttpExecutionContext;
import io.servicetalk.http.api.HttpProtocolVersion;
import io.servicetalk.http.api.StreamingHttpConnectionFilterFactory;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.transport.api.ConnectExecutionStrategy;
import io.servicetalk.transport.api.ExecutionStrategy;
import io.servicetalk.transport.api.IoThreadFactory;
import io.servicetalk.transport.api.TransportObserver;
import io.servicetalk.transport.api.TransportObservers;
import io.servicetalk.transport.netty.internal.NoopTransportObserver;
import java.util.Objects;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/servicetalk/http/netty/AbstractLBHttpConnectionFactory.class */
abstract class AbstractLBHttpConnectionFactory<ResolvedAddress> implements ConnectionFactory<ResolvedAddress, FilterableStreamingHttpLoadBalancedConnection> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLBHttpConnectionFactory.class);

    @Nullable
    private final StreamingHttpConnectionFilterFactory connectionFilterFunction;
    final ReadOnlyHttpClientConfig config;
    final HttpExecutionContext executionContext;
    final Function<HttpProtocolVersion, StreamingHttpRequestResponseFactory> reqRespFactoryFunc;
    private final ConnectionFactory<ResolvedAddress, FilterableStreamingHttpConnection> filterableConnectionFactory;
    private final ProtocolBinding protocolBinding;

    @FunctionalInterface
    /* loaded from: input_file:io/servicetalk/http/netty/AbstractLBHttpConnectionFactory$ProtocolBinding.class */
    interface ProtocolBinding {
        FilterableStreamingHttpLoadBalancedConnection bind(FilterableStreamingHttpConnection filterableStreamingHttpConnection, ReservableRequestConcurrencyController reservableRequestConcurrencyController, @Nullable ContextMap contextMap);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractLBHttpConnectionFactory(ReadOnlyHttpClientConfig readOnlyHttpClientConfig, final HttpExecutionContext httpExecutionContext, Function<HttpProtocolVersion, StreamingHttpRequestResponseFactory> function, final ExecutionStrategy executionStrategy, ConnectionFactoryFilter<ResolvedAddress, FilterableStreamingHttpConnection> connectionFactoryFilter, @Nullable StreamingHttpConnectionFilterFactory streamingHttpConnectionFilterFactory, ProtocolBinding protocolBinding) {
        this.connectionFilterFunction = streamingHttpConnectionFilterFactory;
        this.config = (ReadOnlyHttpClientConfig) Objects.requireNonNull(readOnlyHttpClientConfig);
        this.executionContext = (HttpExecutionContext) Objects.requireNonNull(httpExecutionContext);
        this.reqRespFactoryFunc = (Function) Objects.requireNonNull(function);
        Objects.requireNonNull(executionStrategy);
        this.protocolBinding = (ProtocolBinding) Objects.requireNonNull(protocolBinding);
        this.filterableConnectionFactory = connectionFactoryFilter.create(new ConnectionFactory<ResolvedAddress, FilterableStreamingHttpConnection>() { // from class: io.servicetalk.http.netty.AbstractLBHttpConnectionFactory.1
            private final ListenableAsyncCloseable close = AsyncCloseables.emptyAsyncCloseable();

            /* JADX WARN: Multi-variable type inference failed */
            public Single<FilterableStreamingHttpConnection> newConnection(ResolvedAddress resolvedaddress, @Nullable ContextMap contextMap, @Nullable TransportObserver transportObserver) {
                Single<FilterableStreamingHttpConnection> newFilterableConnection = AbstractLBHttpConnectionFactory.this.newFilterableConnection(Objects.requireNonNull(resolvedaddress, "Resolved address cannot be null"), transportObserver == null ? NoopTransportObserver.INSTANCE : TransportObservers.asSafeObserver(transportObserver));
                return ((executionStrategy instanceof ConnectExecutionStrategy) && executionStrategy.isConnectOffloaded()) ? newFilterableConnection.publishOn(httpExecutionContext.executor(), IoThreadFactory.IoThread::currentThreadIsIoThread) : newFilterableConnection;
            }

            public Completable onClose() {
                return this.close.onClose();
            }

            public Completable onClosing() {
                return this.close.onClosing();
            }

            public Completable closeAsync() {
                return this.close.closeAsync();
            }

            public Completable closeAsyncGracefully() {
                return this.close.closeAsyncGracefully();
            }
        });
    }

    public final Single<FilterableStreamingHttpLoadBalancedConnection> newConnection(ResolvedAddress resolvedaddress, @Nullable ContextMap contextMap, @Nullable TransportObserver transportObserver) {
        return this.filterableConnectionFactory.newConnection(resolvedaddress, contextMap, transportObserver).map(filterableStreamingHttpConnection -> {
            FilterableStreamingHttpConnection create = this.connectionFilterFunction != null ? this.connectionFilterFunction.create(filterableStreamingHttpConnection) : filterableStreamingHttpConnection;
            return this.protocolBinding.bind(create, newConcurrencyController(create.transportEventStream(AbstractStreamingHttpConnection.MAX_CONCURRENCY_NO_OFFLOADING).beforeOnNext(consumableEvent -> {
                LOGGER.debug("{} Received {} event: {}", new Object[]{filterableStreamingHttpConnection, AbstractStreamingHttpConnection.MAX_CONCURRENCY_NO_OFFLOADING, consumableEvent});
            }), create.onClosing()), contextMap);
        });
    }

    abstract Single<FilterableStreamingHttpConnection> newFilterableConnection(ResolvedAddress resolvedaddress, TransportObserver transportObserver);

    abstract ReservableRequestConcurrencyController newConcurrencyController(Publisher<? extends ConsumableEvent<Integer>> publisher, Completable completable);

    public final Completable onClose() {
        return this.filterableConnectionFactory.onClose();
    }

    public final Completable onClosing() {
        return this.filterableConnectionFactory.onClosing();
    }

    public final Completable closeAsync() {
        return this.filterableConnectionFactory.closeAsync();
    }

    public final Completable closeAsyncGracefully() {
        return this.filterableConnectionFactory.closeAsyncGracefully();
    }
}
