package reactor.netty.resources;

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.AttributeKey;
import java.io.IOException;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.FutureMono;
import reactor.netty.NettyPipeline;
import reactor.netty.ReactorNetty;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool;
import reactor.netty.internal.shaded.reactor.pool.PooledRef;
import reactor.netty.internal.shaded.reactor.pool.PooledRefMetadata;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.PooledConnectionProvider;
import reactor.netty.transport.TransportConfig;
import reactor.netty.transport.TransportConnector;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/netty/resources/DefaultPooledConnectionProvider.class */
public final class DefaultPooledConnectionProvider extends PooledConnectionProvider<PooledConnection> {
    final Map<SocketAddress, PooledConnectionProvider.PoolFactory<PooledConnection>> poolFactoryPerRemoteHost;
    final Map<SocketAddress, Integer> maxConnections;
    static final Logger log = Loggers.getLogger(DefaultPooledConnectionProvider.class);
    static final AttributeKey<ConnectionObserver> OWNER = AttributeKey.valueOf("connectionOwner");

    /* loaded from: input_file:reactor/netty/resources/DefaultPooledConnectionProvider$DisposableAcquire.class */
    static final class DisposableAcquire implements ConnectionObserver, Runnable, CoreSubscriber<PooledRef<PooledConnection>>, Disposable {
        final Disposable.Composite cancellations;
        final ConnectionObserver obs;
        final ChannelOperations.OnSetup opsFactory;
        final long pendingAcquireTimeout;
        final InstrumentedPool<PooledConnection> pool;
        final boolean retried;
        final MonoSink<Connection> sink;
        PooledRef<PooledConnection> pooledRef;
        Subscription subscription;

        DisposableAcquire(ConnectionObserver connectionObserver, ChannelOperations.OnSetup onSetup, long j, InstrumentedPool<PooledConnection> instrumentedPool, MonoSink<Connection> monoSink) {
            this.cancellations = Disposables.composite();
            this.obs = connectionObserver;
            this.opsFactory = onSetup;
            this.pendingAcquireTimeout = j;
            this.pool = instrumentedPool;
            this.retried = false;
            this.sink = monoSink;
        }

        DisposableAcquire(DisposableAcquire disposableAcquire) {
            this.cancellations = disposableAcquire.cancellations;
            this.obs = disposableAcquire.obs;
            this.opsFactory = disposableAcquire.opsFactory;
            this.pendingAcquireTimeout = disposableAcquire.pendingAcquireTimeout;
            this.pool = disposableAcquire.pool;
            this.retried = true;
            this.sink = disposableAcquire.sink;
        }

        @Override // reactor.netty.ConnectionObserver
        public Context currentContext() {
            return this.sink.currentContext();
        }

        public void dispose() {
            this.subscription.cancel();
        }

        public void onComplete() {
        }

        public void onError(Throwable th) {
            this.sink.error(th);
        }

        public void onNext(PooledRef<PooledConnection> pooledRef) {
            this.pooledRef = pooledRef;
            PooledConnection poolable = pooledRef.poolable();
            poolable.pooledRef = this.pooledRef;
            Channel channel = poolable.channel;
            if (channel.eventLoop().inEventLoop()) {
                run();
            } else {
                channel.eventLoop().execute(this);
            }
        }

        @Override // reactor.netty.ConnectionObserver
        public void onStateChange(Connection connection, ConnectionObserver.State state) {
            if (state == ConnectionObserver.State.CONFIGURED) {
                this.sink.success(connection);
            }
            this.obs.onStateChange(connection, state);
        }

        public void onSubscribe(Subscription subscription) {
            if (Operators.validate(this.subscription, subscription)) {
                this.subscription = subscription;
                this.cancellations.add(this);
                if (!this.retried) {
                    this.sink.onCancel(this.cancellations);
                }
                subscription.request(Long.MAX_VALUE);
            }
        }

        @Override // reactor.netty.ConnectionObserver
        public void onUncaughtException(Connection connection, Throwable th) {
            this.sink.error(th);
            this.obs.onUncaughtException(connection, th);
        }

        @Override // java.lang.Runnable
        public void run() {
            PooledConnection poolable = this.pooledRef.poolable();
            Channel channel = poolable.channel;
            if (!channel.isActive()) {
                this.pooledRef.invalidate().subscribe((Consumer) null, (Consumer) null, () -> {
                    if (DefaultPooledConnectionProvider.log.isDebugEnabled()) {
                        DefaultPooledConnectionProvider.log.debug(ReactorNetty.format(channel, "Channel closed, now {} active connections and {} inactive connections"), new Object[]{Integer.valueOf(this.pool.metrics().acquiredSize()), Integer.valueOf(this.pool.metrics().idleSize())});
                    }
                });
                if (this.retried) {
                    this.sink.error(new IOException("Error while acquiring from " + this.pool));
                    return;
                }
                if (DefaultPooledConnectionProvider.log.isDebugEnabled()) {
                    DefaultPooledConnectionProvider.log.debug(ReactorNetty.format(channel, "Immediately aborted pooled channel, re-acquiring new channel"));
                }
                this.pool.acquire(Duration.ofMillis(this.pendingAcquireTimeout)).subscribe(new DisposableAcquire(this));
                return;
            }
            ConnectionObserver connectionObserver = (ConnectionObserver) channel.attr(DefaultPooledConnectionProvider.OWNER).getAndSet(this);
            if (connectionObserver instanceof PendingConnectionObserver) {
                PendingConnectionObserver pendingConnectionObserver = (PendingConnectionObserver) connectionObserver;
                connectionObserver = null;
                registerClose(this.pooledRef, this.pool);
                while (true) {
                    PendingConnectionObserver.Pending poll = pendingConnectionObserver.pendingQueue.poll();
                    if (poll == null) {
                        break;
                    }
                    if (poll.error != null) {
                        onUncaughtException(poll.connection, poll.error);
                    } else if (poll.state != null) {
                        onStateChange(poll.connection, poll.state);
                    }
                }
            } else if (connectionObserver == null) {
                registerClose(this.pooledRef, this.pool);
            }
            if (connectionObserver == null) {
                if (DefaultPooledConnectionProvider.log.isDebugEnabled()) {
                    DefaultPooledConnectionProvider.log.debug(ReactorNetty.format(channel, "Channel connected, now {} active connections and {} inactive connections"), new Object[]{Integer.valueOf(this.pool.metrics().acquiredSize()), Integer.valueOf(this.pool.metrics().idleSize())});
                }
                if (this.opsFactory == ChannelOperations.OnSetup.empty()) {
                    this.sink.success(Connection.from(channel));
                    return;
                }
                return;
            }
            if (DefaultPooledConnectionProvider.log.isDebugEnabled()) {
                DefaultPooledConnectionProvider.log.debug(ReactorNetty.format(channel, "Channel acquired, now {} active connections and {} inactive connections"), new Object[]{Integer.valueOf(this.pool.metrics().acquiredSize()), Integer.valueOf(this.pool.metrics().idleSize())});
            }
            this.obs.onStateChange(poolable, ConnectionObserver.State.ACQUIRED);
            ChannelOperations<?, ?> create = this.opsFactory.create(poolable, poolable, null);
            if (create == null) {
                this.sink.success(poolable);
                return;
            }
            if (channel.pipeline().get(NettyPipeline.H2MultiplexHandler) != null) {
                this.sink.success(create);
                this.obs.onStateChange(poolable, ConnectionObserver.State.CONFIGURED);
            } else {
                create.bind();
                this.sink.success(create);
                this.obs.onStateChange(create, ConnectionObserver.State.CONFIGURED);
            }
        }

        void registerClose(PooledRef<PooledConnection> pooledRef, InstrumentedPool<PooledConnection> instrumentedPool) {
            Channel channel = pooledRef.poolable().channel;
            if (DefaultPooledConnectionProvider.log.isDebugEnabled()) {
                DefaultPooledConnectionProvider.log.debug(ReactorNetty.format(channel, "Registering pool release on close event for channel"));
            }
            channel.closeFuture().addListener2(future -> {
                ConnectionObserver connectionObserver = (ConnectionObserver) channel.attr(DefaultPooledConnectionProvider.OWNER).get();
                if (connectionObserver instanceof DisposableAcquire) {
                    ((DisposableAcquire) connectionObserver).pooledRef.invalidate().subscribe((Consumer) null, (Consumer) null, () -> {
                        if (DefaultPooledConnectionProvider.log.isDebugEnabled()) {
                            DefaultPooledConnectionProvider.log.debug(ReactorNetty.format(channel, "Channel closed, now {} active connections and {} inactive connections"), new Object[]{Integer.valueOf(instrumentedPool.metrics().acquiredSize()), Integer.valueOf(instrumentedPool.metrics().idleSize())});
                        }
                    });
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/netty/resources/DefaultPooledConnectionProvider$PendingConnectionObserver.class */
    public static final class PendingConnectionObserver implements ConnectionObserver {
        final Queue<Pending> pendingQueue;
        final Context context;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:reactor/netty/resources/DefaultPooledConnectionProvider$PendingConnectionObserver$Pending.class */
        public static class Pending {
            final Connection connection;
            final Throwable error;
            final ConnectionObserver.State state;

            Pending(Connection connection, @Nullable Throwable th, @Nullable ConnectionObserver.State state) {
                this.connection = connection;
                this.error = th;
                this.state = state;
            }
        }

        public PendingConnectionObserver() {
            this(Context.empty());
        }

        public PendingConnectionObserver(Context context) {
            this.pendingQueue = (Queue) Queues.unbounded(4).get();
            this.context = context;
        }

        @Override // reactor.netty.ConnectionObserver
        public void onStateChange(Connection connection, ConnectionObserver.State state) {
            this.pendingQueue.add(new Pending(connection, null, state));
        }

        @Override // reactor.netty.ConnectionObserver
        public void onUncaughtException(Connection connection, Throwable th) {
            this.pendingQueue.add(new Pending(connection, th, null));
        }

        @Override // reactor.netty.ConnectionObserver
        public Context currentContext() {
            return this.context;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/netty/resources/DefaultPooledConnectionProvider$PooledConnection.class */
    public static final class PooledConnection implements Connection, ConnectionObserver {
        final Channel channel;
        final Sinks.Empty<Void> onTerminate = Sinks.unsafe().empty();
        final InstrumentedPool<PooledConnection> pool;
        PooledRef<PooledConnection> pooledRef;

        PooledConnection(Channel channel, InstrumentedPool<PooledConnection> instrumentedPool) {
            this.channel = channel;
            this.pool = instrumentedPool;
        }

        @Override // reactor.netty.DisposableChannel
        public Channel channel() {
            return this.channel;
        }

        @Override // reactor.netty.ConnectionObserver
        public Context currentContext() {
            return owner().currentContext();
        }

        @Override // reactor.netty.ConnectionObserver
        public void onStateChange(Connection connection, ConnectionObserver.State state) {
            if (DefaultPooledConnectionProvider.log.isDebugEnabled()) {
                DefaultPooledConnectionProvider.log.debug(ReactorNetty.format(connection.channel(), "onStateChange({}, {})"), new Object[]{connection, state});
            }
            if (state != ConnectionObserver.State.DISCONNECTING) {
                owner().onStateChange(connection, state);
                return;
            }
            if (!isPersistent() && this.channel.isActive()) {
                this.channel.close();
                owner().onStateChange(connection, ConnectionObserver.State.DISCONNECTING);
            } else {
                if (!this.channel.isActive()) {
                    owner().onStateChange(connection, ConnectionObserver.State.DISCONNECTING);
                    return;
                }
                if (DefaultPooledConnectionProvider.log.isDebugEnabled()) {
                    DefaultPooledConnectionProvider.log.debug(ReactorNetty.format(connection.channel(), "Releasing channel"));
                }
                ConnectionObserver connectionObserver = (ConnectionObserver) this.channel.attr(DefaultPooledConnectionProvider.OWNER).getAndSet(ConnectionObserver.emptyListener());
                if (this.pooledRef == null) {
                    return;
                }
                this.pooledRef.release().subscribe((Consumer) null, th -> {
                    if (DefaultPooledConnectionProvider.log.isDebugEnabled()) {
                        DefaultPooledConnectionProvider.log.debug(ReactorNetty.format(this.pooledRef.poolable().channel, "Failed cleaning the channel from pool, now {} active connections and {} inactive connections"), new Object[]{Integer.valueOf(this.pool.metrics().acquiredSize()), Integer.valueOf(this.pool.metrics().idleSize()), th});
                    }
                    this.onTerminate.tryEmitEmpty();
                    connectionObserver.onStateChange(connection, ConnectionObserver.State.RELEASED);
                }, () -> {
                    if (DefaultPooledConnectionProvider.log.isDebugEnabled()) {
                        DefaultPooledConnectionProvider.log.debug(ReactorNetty.format(this.pooledRef.poolable().channel, "Channel cleaned, now {} active connections and {} inactive connections"), new Object[]{Integer.valueOf(this.pool.metrics().acquiredSize()), Integer.valueOf(this.pool.metrics().idleSize())});
                    }
                    this.onTerminate.tryEmitEmpty();
                    connectionObserver.onStateChange(connection, ConnectionObserver.State.RELEASED);
                });
            }
        }

        @Override // reactor.netty.Connection
        public Mono<Void> onTerminate() {
            return this.onTerminate.asMono().or(onDispose());
        }

        @Override // reactor.netty.ConnectionObserver
        public void onUncaughtException(Connection connection, Throwable th) {
            owner().onUncaughtException(connection, th);
        }

        public String toString() {
            return "PooledConnection{channel=" + this.channel + '}';
        }

        ConnectionObserver owner() {
            PendingConnectionObserver pendingConnectionObserver;
            do {
                ConnectionObserver connectionObserver = (ConnectionObserver) this.channel.attr(DefaultPooledConnectionProvider.OWNER).get();
                if (connectionObserver != null) {
                    return connectionObserver;
                }
                pendingConnectionObserver = new PendingConnectionObserver();
            } while (!this.channel.attr(DefaultPooledConnectionProvider.OWNER).compareAndSet(null, pendingConnectionObserver));
            return pendingConnectionObserver;
        }
    }

    /* loaded from: input_file:reactor/netty/resources/DefaultPooledConnectionProvider$PooledConnectionAllocator.class */
    static final class PooledConnectionAllocator {
        final TransportConfig config;
        final InstrumentedPool<PooledConnection> pool;
        final SocketAddress remoteAddress;
        final AddressResolverGroup<?> resolver;
        static final BiPredicate<PooledConnection, PooledRefMetadata> DEFAULT_EVICTION_PREDICATE = (pooledConnection, pooledRefMetadata) -> {
            return (pooledConnection.channel.isActive() && pooledConnection.isPersistent()) ? false : true;
        };
        static final Function<PooledConnection, Publisher<Void>> DEFAULT_DESTROY_HANDLER = pooledConnection -> {
            return !pooledConnection.channel.isActive() ? Mono.empty() : FutureMono.from(pooledConnection.channel.close());
        };

        /* loaded from: input_file:reactor/netty/resources/DefaultPooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.class */
        final class PooledConnectionInitializer extends ChannelInitializer<Channel> implements CoreSubscriber<Channel> {
            final MonoSink<PooledConnection> sink;
            PooledConnection pooledConnection;

            PooledConnectionInitializer(MonoSink<PooledConnection> monoSink) {
                this.sink = monoSink;
            }

            @Override // io.netty.channel.ChannelInitializer
            protected void initChannel(Channel channel) {
                if (DefaultPooledConnectionProvider.log.isDebugEnabled()) {
                    DefaultPooledConnectionProvider.log.debug(ReactorNetty.format(channel, "Created a new pooled channel, now {} active connections and {} inactive connections"), new Object[]{Integer.valueOf(PooledConnectionAllocator.this.pool.metrics().acquiredSize()), Integer.valueOf(PooledConnectionAllocator.this.pool.metrics().idleSize())});
                }
                PooledConnection pooledConnection = new PooledConnection(channel, PooledConnectionAllocator.this.pool);
                this.pooledConnection = pooledConnection;
                pooledConnection.bind();
                channel.attr(DefaultPooledConnectionProvider.OWNER).compareAndSet(null, new PendingConnectionObserver(this.sink.currentContext()));
                channel.pipeline().remove(this);
                channel.pipeline().addFirst(PooledConnectionAllocator.this.config.channelInitializer(pooledConnection, PooledConnectionAllocator.this.remoteAddress, false));
            }

            public void onComplete() {
            }

            public void onError(Throwable th) {
                this.sink.error(th);
            }

            public void onNext(Channel channel) {
                this.sink.success(this.pooledConnection);
            }

            public void onSubscribe(Subscription subscription) {
                subscription.request(Long.MAX_VALUE);
            }
        }

        PooledConnectionAllocator(TransportConfig transportConfig, PooledConnectionProvider.PoolFactory<PooledConnection> poolFactory, SocketAddress socketAddress, AddressResolverGroup<?> addressResolverGroup) {
            this.config = transportConfig;
            this.remoteAddress = socketAddress;
            this.resolver = addressResolverGroup;
            this.pool = poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE);
        }

        Publisher<PooledConnection> connectChannel() {
            return Mono.create(monoSink -> {
                PooledConnectionInitializer pooledConnectionInitializer = new PooledConnectionInitializer(monoSink);
                TransportConnector.connect(this.config, this.remoteAddress, this.resolver, pooledConnectionInitializer).subscribe(pooledConnectionInitializer);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultPooledConnectionProvider(ConnectionProvider.Builder builder) {
        super(builder);
        this.poolFactoryPerRemoteHost = new HashMap();
        this.maxConnections = new HashMap();
        for (Map.Entry<SocketAddress, ConnectionProvider.ConnectionPoolSpec<?>> entry : builder.confPerRemoteHost.entrySet()) {
            this.poolFactoryPerRemoteHost.put(entry.getKey(), new PooledConnectionProvider.PoolFactory<>(entry.getValue()));
            this.maxConnections.put(entry.getKey(), Integer.valueOf(entry.getValue().maxConnections));
        }
    }

    @Override // reactor.netty.resources.ConnectionProvider
    public Map<SocketAddress, Integer> maxConnectionsPerHost() {
        return this.maxConnections;
    }

    @Override // reactor.netty.resources.PooledConnectionProvider
    protected CoreSubscriber<PooledRef<PooledConnection>> createDisposableAcquire(TransportConfig transportConfig, ConnectionObserver connectionObserver, long j, InstrumentedPool<PooledConnection> instrumentedPool, MonoSink<Connection> monoSink) {
        return new DisposableAcquire(connectionObserver, transportConfig.channelOperationsProvider(), j, instrumentedPool, monoSink);
    }

    @Override // reactor.netty.resources.PooledConnectionProvider
    protected InstrumentedPool<PooledConnection> createPool(TransportConfig transportConfig, PooledConnectionProvider.PoolFactory<PooledConnection> poolFactory, SocketAddress socketAddress, AddressResolverGroup<?> addressResolverGroup) {
        return new PooledConnectionAllocator(transportConfig, poolFactory, socketAddress, addressResolverGroup).pool;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // reactor.netty.resources.PooledConnectionProvider
    protected PooledConnectionProvider.PoolFactory<PooledConnection> poolFactory(SocketAddress socketAddress) {
        return (PooledConnectionProvider.PoolFactory) this.poolFactoryPerRemoteHost.getOrDefault(socketAddress, this.defaultPoolFactory);
    }
}
