package io.pravega.client.netty.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientConfig;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shaded.com.google.common.base.Strings;
import io.pravega.shaded.io.netty.bootstrap.Bootstrap;
import io.pravega.shaded.io.netty.channel.Channel;
import io.pravega.shaded.io.netty.channel.ChannelInitializer;
import io.pravega.shaded.io.netty.channel.ChannelOption;
import io.pravega.shaded.io.netty.channel.ChannelPipeline;
import io.pravega.shaded.io.netty.channel.EventLoopGroup;
import io.pravega.shaded.io.netty.channel.epoll.Epoll;
import io.pravega.shaded.io.netty.channel.epoll.EpollEventLoopGroup;
import io.pravega.shaded.io.netty.channel.epoll.EpollSocketChannel;
import io.pravega.shaded.io.netty.channel.group.ChannelGroup;
import io.pravega.shaded.io.netty.channel.group.DefaultChannelGroup;
import io.pravega.shaded.io.netty.channel.nio.NioEventLoopGroup;
import io.pravega.shaded.io.netty.channel.socket.SocketChannel;
import io.pravega.shaded.io.netty.channel.socket.nio.NioSocketChannel;
import io.pravega.shaded.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.pravega.shaded.io.netty.handler.ssl.SslContext;
import io.pravega.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.pravega.shaded.io.netty.handler.ssl.SslHandler;
import io.pravega.shaded.io.netty.util.concurrent.Future;
import io.pravega.shaded.io.netty.util.concurrent.GenericFutureListener;
import io.pravega.shaded.io.netty.util.concurrent.GlobalEventExecutor;
import io.pravega.shared.metrics.ClientMetricUpdater;
import io.pravega.shared.metrics.MetricListener;
import io.pravega.shared.metrics.MetricNotifier;
import io.pravega.shared.protocol.netty.CommandDecoder;
import io.pravega.shared.protocol.netty.CommandEncoder;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
import io.pravega.shared.protocol.netty.ExceptionLoggingHandler;
import io.pravega.shared.protocol.netty.PravegaNodeUri;
import io.pravega.shared.protocol.netty.ReplyProcessor;
import io.pravega.shared.protocol.netty.WireCommands;
import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/netty/impl/ConnectionPoolImpl.class */
public class ConnectionPoolImpl implements ConnectionPool {
    private final ClientConfig clientConfig;
    private final MetricNotifier metricNotifier;

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(ConnectionPoolImpl.class);
    private static final Comparator<Connection> COMPARATOR = new Comparator<Connection>() { // from class: io.pravega.client.netty.impl.ConnectionPoolImpl.1
        @Override // java.util.Comparator
        public int compare(Connection connection, Connection connection2) {
            return Integer.compare(Futures.isSuccessful(connection.getConnected()) ? connection.getFlowCount() : Integer.MAX_VALUE, Futures.isSuccessful(connection2.getConnected()) ? connection2.getFlowCount() : Integer.MAX_VALUE);
        }
    };

    @SuppressFBWarnings(justification = "generated code")
    private final Object $lock = new Object[0];
    private final AtomicBoolean closed = new AtomicBoolean(false);

    @VisibleForTesting
    private final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @GuardedBy("$lock")
    private final Map<PravegaNodeUri, List<Connection>> connectionMap = new HashMap();
    private final EventLoopGroup group = getEventLoopGroup();

    public ConnectionPoolImpl(ClientConfig clientConfig) {
        this.clientConfig = clientConfig;
        MetricListener metricListener = clientConfig.getMetricListener();
        this.metricNotifier = metricListener == null ? MetricNotifier.NO_OP_METRIC_NOTIFIER : new ClientMetricUpdater(metricListener);
    }

    @Override // io.pravega.client.netty.impl.ConnectionPool
    public CompletableFuture<ClientConnection> getClientConnection(Flow flow, PravegaNodeUri pravegaNodeUri, ReplyProcessor replyProcessor) {
        Connection connection;
        CompletableFuture thenApply;
        synchronized (this.$lock) {
            Preconditions.checkNotNull(flow, "Flow");
            Preconditions.checkNotNull(pravegaNodeUri, "Location");
            Preconditions.checkNotNull(replyProcessor, "ReplyProcessor");
            Exceptions.checkNotClosed(this.closed.get(), this);
            List<Connection> list = (List) this.connectionMap.getOrDefault(pravegaNodeUri, new ArrayList()).stream().filter(connection2 -> {
                return !connection2.getConnected().isDone() || (Futures.isSuccessful(connection2.getConnected()) && connection2.getFlowHandler().isConnectionEstablished());
            }).collect(Collectors.toList());
            log.debug("List of connections to {} that can be used: {}", pravegaNodeUri, list);
            Optional<Connection> min = list.stream().min(COMPARATOR);
            if (!min.isPresent() || (list.size() < this.clientConfig.getMaxConnectionsPerSegmentStore() && !isUnused(min.get()))) {
                log.info("Creating a new connection to {}", pravegaNodeUri);
                FlowHandler flowHandler = new FlowHandler(pravegaNodeUri.getEndpoint(), this.metricNotifier);
                connection = new Connection(pravegaNodeUri, flowHandler, establishConnection(pravegaNodeUri, flowHandler));
                list.add(connection);
            } else {
                log.info("Reusing connection: {}", min.get());
                connection = min.get();
            }
            ClientConnection createFlow = connection.getFlowHandler().createFlow(flow, replyProcessor);
            this.connectionMap.put(pravegaNodeUri, list);
            thenApply = connection.getConnected().thenApply(r3 -> {
                return createFlow;
            });
        }
        return thenApply;
    }

    @Override // io.pravega.client.netty.impl.ConnectionPool
    public CompletableFuture<ClientConnection> getClientConnection(PravegaNodeUri pravegaNodeUri, ReplyProcessor replyProcessor) {
        CompletableFuture thenApply;
        synchronized (this.$lock) {
            Preconditions.checkNotNull(pravegaNodeUri, "Location");
            Preconditions.checkNotNull(replyProcessor, "ReplyProcessor");
            Exceptions.checkNotClosed(this.closed.get(), this);
            FlowHandler flowHandler = new FlowHandler(pravegaNodeUri.getEndpoint(), this.metricNotifier);
            CompletableFuture<Void> establishConnection = establishConnection(pravegaNodeUri, flowHandler);
            ClientConnection createConnectionWithFlowDisabled = new Connection(pravegaNodeUri, flowHandler, establishConnection).getFlowHandler().createConnectionWithFlowDisabled(replyProcessor);
            thenApply = establishConnection.thenApply(r3 -> {
                return createConnectionWithFlowDisabled;
            });
        }
        return thenApply;
    }

    private boolean isUnused(Connection connection) {
        return Futures.isSuccessful(connection.getConnected()) && connection.getFlowCount() == 0;
    }

    @VisibleForTesting
    public void pruneUnusedConnections() {
        synchronized (this.$lock) {
            Iterator<List<Connection>> it = this.connectionMap.values().iterator();
            while (it.hasNext()) {
                Iterator<Connection> it2 = it.next().iterator();
                while (it2.hasNext()) {
                    Connection next = it2.next();
                    if (isUnused(next)) {
                        next.getFlowHandler().close();
                        it2.remove();
                    }
                }
            }
        }
    }

    @Override // io.pravega.client.netty.impl.ConnectionPool
    public int getActiveChannelCount() {
        return getActiveChannels().size();
    }

    @VisibleForTesting
    public List<Channel> getActiveChannels() {
        return (List) this.channelGroup.stream().filter((v0) -> {
            return v0.isActive();
        }).peek(channel -> {
            log.debug("Channel with id {} localAddress {} and remoteAddress {} is active.", new Object[]{channel.id(), channel.localAddress(), channel.remoteAddress()});
        }).collect(Collectors.toList());
    }

    private CompletableFuture<Void> establishConnection(PravegaNodeUri pravegaNodeUri, FlowHandler flowHandler) {
        Bootstrap handler = getNettyBootstrap().handler(getChannelInitializer(pravegaNodeUri, flowHandler));
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            handler.connect(pravegaNodeUri.getEndpoint(), pravegaNodeUri.getPort()).addListener2((GenericFutureListener<? extends Future<? super Void>>) channelFuture -> {
                if (!channelFuture.isSuccess()) {
                    completableFuture.completeExceptionally(new ConnectionFailedException(channelFuture.cause()));
                    return;
                }
                Channel channel = channelFuture.channel();
                log.debug("Connect operation completed for channel:{}, local address:{}, remote address:{}", new Object[]{channel.id(), channel.localAddress(), channel.remoteAddress()});
                this.channelGroup.add(channel);
                completableFuture.complete(null);
            });
        } catch (Exception e) {
            completableFuture.completeExceptionally(new ConnectionFailedException(e));
        }
        return completableFuture.thenCompose(r4 -> {
            CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
            flowHandler.completeWhenRegistered(completableFuture2);
            return completableFuture2;
        });
    }

    private Bootstrap getNettyBootstrap() {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(this.group).channel(Epoll.isAvailable() ? EpollSocketChannel.class : NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true);
        return bootstrap;
    }

    @VisibleForTesting
    ChannelInitializer<SocketChannel> getChannelInitializer(final PravegaNodeUri pravegaNodeUri, final FlowHandler flowHandler) {
        final SslContext sslContext = getSslContext();
        return new ChannelInitializer<SocketChannel>() { // from class: io.pravega.client.netty.impl.ConnectionPoolImpl.2
            @Override // io.pravega.shaded.io.netty.channel.ChannelInitializer
            public void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline pipeline = socketChannel.pipeline();
                if (sslContext != null) {
                    SslHandler newHandler = sslContext.newHandler(socketChannel.alloc(), pravegaNodeUri.getEndpoint(), pravegaNodeUri.getPort());
                    if (ConnectionPoolImpl.this.clientConfig.isValidateHostName()) {
                        SSLEngine engine = newHandler.engine();
                        SSLParameters sSLParameters = engine.getSSLParameters();
                        sSLParameters.setEndpointIdentificationAlgorithm("HTTPS");
                        engine.setSSLParameters(sSLParameters);
                    }
                    pipeline.addLast(newHandler);
                }
                FlowHandler flowHandler2 = flowHandler;
                flowHandler2.getClass();
                pipeline.addLast(new ExceptionLoggingHandler(pravegaNodeUri.getEndpoint()), new CommandEncoder((v1) -> {
                    return r6.getAppendBatchSizeTracker(v1);
                }, ConnectionPoolImpl.this.metricNotifier), new LengthFieldBasedFrameDecoder(WireCommands.MAX_WIRECOMMAND_SIZE, 4, 4), new CommandDecoder(), flowHandler);
            }
        };
    }

    @VisibleForTesting
    SslContext getSslContext() {
        SslContext sslContext;
        if (this.clientConfig.isEnableTlsToSegmentStore()) {
            log.debug("Setting up an SSL/TLS Context");
            try {
                SslContextBuilder forClient = SslContextBuilder.forClient();
                if (Strings.isNullOrEmpty(this.clientConfig.getTrustStore())) {
                    log.debug("Client truststore wasn't specified.");
                    forClient.trustManager((File) null);
                } else {
                    forClient.trustManager(new File(this.clientConfig.getTrustStore()));
                    log.debug("Client truststore: {}", this.clientConfig.getTrustStore());
                }
                sslContext = forClient.build();
            } catch (SSLException e) {
                throw new RuntimeException(e);
            }
        } else {
            sslContext = null;
        }
        return sslContext;
    }

    private EventLoopGroup getEventLoopGroup() {
        if (Epoll.isAvailable()) {
            return new EpollEventLoopGroup();
        }
        log.warn("Epoll not available. Falling back on NIO.");
        return new NioEventLoopGroup();
    }

    @Override // io.pravega.client.netty.impl.ConnectionPool, java.lang.AutoCloseable
    public void close() {
        log.info("Shutting down connection pool");
        if (this.closed.compareAndSet(false, true)) {
            this.group.shutdownGracefully();
            this.metricNotifier.close();
        }
    }

    @SuppressFBWarnings(justification = "generated code")
    ChannelGroup getChannelGroup() {
        return this.channelGroup;
    }
}
