package io.hyperfoil.http.connection;

import io.hyperfoil.http.api.ConnectionConsumer;
import io.hyperfoil.http.api.HttpClientPool;
import io.hyperfoil.http.api.HttpConnection;
import io.hyperfoil.http.api.HttpConnectionPool;
import io.hyperfoil.http.config.ConnectionPoolConfig;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.ScheduledFuture;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.FormattedMessage;

/* loaded from: input_file:io/hyperfoil/http/connection/SharedConnectionPool.class */
class SharedConnectionPool extends ConnectionPoolStats implements HttpConnectionPool {
    private static final Logger log;
    private static final boolean trace;
    private static final int MAX_FAILURES = 100;
    private final HttpClientPoolImpl clientPool;
    private final ArrayList<HttpConnection> connections;
    private final ArrayDeque<HttpConnection> available;
    private final List<HttpConnection> temporaryInFlight;
    private final ConnectionReceiver handleNewConnection;
    private final Runnable checkCreateConnections;
    private final Runnable onConnectFailure;
    private final ConnectionPoolConfig sizeConfig;
    private final EventLoop eventLoop;
    private int connecting;
    private int created;
    private int closed;
    private int availableClosed;
    private int failures;
    private Handler<AsyncResult<Void>> startedHandler;
    private boolean shutdown;
    private final Deque<ConnectionConsumer> waiting;
    private ScheduledFuture<?> pulseFuture;
    private ScheduledFuture<?> keepAliveFuture;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SharedConnectionPool(HttpClientPoolImpl httpClientPoolImpl, EventLoop eventLoop, ConnectionPoolConfig connectionPoolConfig) {
        super(httpClientPoolImpl.authority);
        this.connections = new ArrayList<>();
        this.handleNewConnection = this::handleNewConnection;
        this.checkCreateConnections = this::checkCreateConnections;
        this.onConnectFailure = this::onConnectFailure;
        this.waiting = new ArrayDeque();
        this.clientPool = httpClientPoolImpl;
        this.sizeConfig = connectionPoolConfig;
        this.eventLoop = eventLoop;
        this.available = new ArrayDeque<>(connectionPoolConfig.max());
        this.temporaryInFlight = new ArrayList(connectionPoolConfig.max());
    }

    @Override // io.hyperfoil.http.api.HttpConnectionPool
    public HttpClientPool clientPool() {
        return this.clientPool;
    }

    private HttpConnection acquireNow(boolean z) {
        HttpConnection pollFirst;
        if (!$assertionsDisabled && !this.eventLoop.inEventLoop()) {
            throw new AssertionError();
        }
        while (true) {
            try {
                pollFirst = this.available.pollFirst();
                if (pollFirst == null) {
                    log.debug("No connection to {} available, currently used {}", this.authority, Integer.valueOf(this.usedConnections.current()));
                    if (!this.temporaryInFlight.isEmpty()) {
                        this.available.addAll(this.temporaryInFlight);
                        this.temporaryInFlight.clear();
                    }
                    return null;
                }
                if (pollFirst.isClosed()) {
                    this.availableClosed--;
                    log.trace("Connection {} to {} is already closed", pollFirst, this.authority);
                } else {
                    if (!z || pollFirst.inFlight() <= 0) {
                        break;
                    }
                    this.temporaryInFlight.add(pollFirst);
                }
            } catch (Throwable th) {
                if (!this.temporaryInFlight.isEmpty()) {
                    this.available.addAll(this.temporaryInFlight);
                    this.temporaryInFlight.clear();
                }
                throw th;
            }
        }
        this.inFlight.incrementUsed();
        if (pollFirst.inFlight() == 0) {
            this.usedConnections.incrementUsed();
        }
        pollFirst.onAcquire();
        if (!this.temporaryInFlight.isEmpty()) {
            this.available.addAll(this.temporaryInFlight);
            this.temporaryInFlight.clear();
        }
        return pollFirst;
    }

    @Override // io.hyperfoil.http.api.HttpConnectionPool
    public void acquire(boolean z, ConnectionConsumer connectionConsumer) {
        HttpConnection acquireNow = acquireNow(z);
        if (acquireNow != null) {
            connectionConsumer.accept(acquireNow);
            checkCreateConnections();
        } else if (this.failures > MAX_FAILURES) {
            log.error("The request cannot be made since the failures to connect to {} exceeded a threshold. Stopping session.", this.authority);
            connectionConsumer.accept(null);
        } else {
            this.waiting.add(connectionConsumer);
            this.blockedSessions.incrementUsed();
        }
    }

    @Override // io.hyperfoil.http.api.HttpConnectionPool
    public void afterRequestSent(HttpConnection httpConnection) {
        if (httpConnection.isAvailable()) {
            if (httpConnection.inFlight() == 0) {
                this.available.addFirst(httpConnection);
            } else {
                this.available.addLast(httpConnection);
            }
        }
    }

    @Override // io.hyperfoil.http.api.HttpConnectionPool
    public void release(HttpConnection httpConnection, boolean z, boolean z2) {
        if (trace) {
            log.trace("Release {} (became available={} after request={})", httpConnection, Boolean.valueOf(z), Boolean.valueOf(z2));
        }
        if (z) {
            if (!$assertionsDisabled && httpConnection.isClosed()) {
                throw new AssertionError();
            }
            if (httpConnection.inFlight() == 0) {
                this.available.addFirst(httpConnection);
            } else {
                this.available.addLast(httpConnection);
            }
        }
        if (z2) {
            this.inFlight.decrementUsed();
        }
        if (httpConnection.inFlight() == 0) {
            this.usedConnections.decrementUsed();
        }
        if (this.keepAliveFuture != null || this.sizeConfig.keepAliveTime() <= 0) {
            return;
        }
        long keepAliveTime = this.sizeConfig.keepAliveTime() - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.available.stream().filter(httpConnection2 -> {
            return !httpConnection2.isClosed();
        }).mapToLong((v0) -> {
            return v0.lastUsed();
        }).min().orElse(httpConnection.lastUsed()));
        log.debug("Scheduling next keep-alive check in {} ms", Long.valueOf(keepAliveTime));
        this.keepAliveFuture = this.eventLoop.schedule(() -> {
            long nanoTime = System.nanoTime();
            Iterator<HttpConnection> it = this.available.iterator();
            while (it.hasNext()) {
                HttpConnection next = it.next();
                if (!next.isClosed() && TimeUnit.NANOSECONDS.toMillis(nanoTime - next.lastUsed()) > this.sizeConfig.keepAliveTime()) {
                    next.close();
                }
            }
            this.keepAliveFuture = null;
        }, keepAliveTime, TimeUnit.MILLISECONDS);
    }

    @Override // io.hyperfoil.http.api.HttpConnectionPool
    public void onSessionReset() {
    }

    @Override // io.hyperfoil.http.api.HttpConnectionPool
    public int waitingSessions() {
        return this.waiting.size();
    }

    @Override // io.hyperfoil.http.api.HttpConnectionPool
    public EventLoop executor() {
        return this.eventLoop;
    }

    @Override // io.hyperfoil.http.api.HttpConnectionPool
    public void pulse() {
        if (trace) {
            log.trace("Pulse to {} ({} waiting)", this.authority, Integer.valueOf(this.waiting.size()));
        }
        ConnectionConsumer poll = this.waiting.poll();
        if (poll != null) {
            HttpConnection acquireNow = acquireNow(false);
            if (acquireNow != null) {
                this.blockedSessions.decrementUsed();
                poll.accept(acquireNow);
            } else if (this.failures > MAX_FAILURES) {
                log.error("The request cannot be made since the failures to connect to {} exceeded a threshold. Stopping session.", this.authority);
                poll.accept(null);
            } else {
                this.waiting.addFirst(poll);
            }
        }
        if (this.pulseFuture != null || this.waiting.isEmpty()) {
            return;
        }
        this.pulseFuture = executor().schedule(this::scheduledPulse, 1L, TimeUnit.MILLISECONDS);
    }

    private Object scheduledPulse() {
        this.pulseFuture = null;
        pulse();
        return null;
    }

    @Override // io.hyperfoil.http.api.HttpConnectionPool
    public Collection<HttpConnection> connections() {
        return this.connections;
    }

    private void checkCreateConnections() {
        if (!$assertionsDisabled && !this.eventLoop.inEventLoop()) {
            throw new AssertionError();
        }
        if (this.shutdown) {
            return;
        }
        if (this.failures <= MAX_FAILURES) {
            if (needsMoreConnections()) {
                this.connecting++;
                this.clientPool.connect(this, this.handleNewConnection);
                this.eventLoop.schedule(this.checkCreateConnections, 2L, TimeUnit.MILLISECONDS);
                return;
            }
            return;
        }
        Handler<AsyncResult<Void>> handler = this.startedHandler;
        if (handler != null) {
            this.startedHandler = null;
            String format = String.format("Cannot connect to %s: %d created, %d failures.", this.authority, Integer.valueOf(this.created), Integer.valueOf(this.failures));
            if (this.created > 0) {
                format = format + " Hint: either configure SUT to accept more open connections or reduce http.sharedConnections.";
            }
            handler.handle(Future.failedFuture(format));
        }
        pulse();
    }

    private boolean needsMoreConnections() {
        return this.created + this.connecting < this.sizeConfig.core() || (this.created + this.connecting < this.sizeConfig.max() && (this.connecting + this.available.size()) - this.availableClosed < this.sizeConfig.buffer());
    }

    private void handleNewConnection(HttpConnection httpConnection, Throwable th) {
        if (th != null) {
            log.warn(new FormattedMessage("Cannot create connection to {} (created: {}, failures: {})", new Object[]{this.authority, Integer.valueOf(this.created), Integer.valueOf(this.failures + 1)}), th);
            if (this.eventLoop.isShuttingDown() || this.eventLoop.isShutdown()) {
                return;
            }
            this.eventLoop.execute(this.onConnectFailure);
            return;
        }
        if (!$assertionsDisabled && httpConnection.context().executor() != this.eventLoop) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !this.eventLoop.inEventLoop()) {
            throw new AssertionError();
        }
        Handler<AsyncResult<Void>> handler = null;
        this.connections.add(httpConnection);
        this.connecting--;
        this.created++;
        this.failures = 0;
        this.available.add(httpConnection);
        log.debug("Created {} to {} ({}+{}=?{}:{}/{})", httpConnection, this.authority, Integer.valueOf(this.created), Integer.valueOf(this.connecting), Integer.valueOf(this.connections.size()), Integer.valueOf(this.available.size() - this.availableClosed), Integer.valueOf(this.sizeConfig.max()));
        incrementTypeStats(httpConnection);
        httpConnection.context().channel().closeFuture().addListener(future -> {
            httpConnection.setClosed();
            log.debug("Closed {} to {}. ({}+{}=?{}:{}/{})", httpConnection, this.authority, Integer.valueOf(this.created), Integer.valueOf(this.connecting), Integer.valueOf(this.connections.size()), Integer.valueOf(this.available.size() - this.availableClosed), Integer.valueOf(this.sizeConfig.max()));
            this.created--;
            this.closed++;
            if (this.available.contains(httpConnection)) {
                this.availableClosed++;
            }
            this.typeStats.get(tagConnection(httpConnection)).decrementUsed();
            if (this.shutdown) {
                return;
            }
            if (this.closed >= this.sizeConfig.max()) {
                this.connections.removeIf((v0) -> {
                    return v0.isClosed();
                });
                this.closed = 0;
            }
            checkCreateConnections();
        });
        if (needsMoreConnections()) {
            checkCreateConnections();
        } else if (this.startedHandler != null && this.created >= this.sizeConfig.core()) {
            handler = this.startedHandler;
            this.startedHandler = null;
        }
        if (handler != null) {
            handler.handle(Future.succeededFuture());
        }
        pulse();
    }

    private void onConnectFailure() {
        this.failures++;
        this.connecting--;
        this.eventLoop.schedule(this.checkCreateConnections, 50L, TimeUnit.MILLISECONDS);
    }

    @Override // io.hyperfoil.http.api.HttpConnectionPool
    public void start(Handler<AsyncResult<Void>> handler) {
        this.startedHandler = handler;
        this.eventLoop.execute(this.checkCreateConnections);
    }

    @Override // io.hyperfoil.http.api.HttpConnectionPool
    public void shutdown() {
        log.debug("Shutdown called");
        this.shutdown = true;
        if (this.eventLoop.isShutdown()) {
            return;
        }
        this.eventLoop.execute(() -> {
            log.debug("Closing all connections");
            Iterator<HttpConnection> it = this.connections.iterator();
            while (it.hasNext()) {
                HttpConnection next = it.next();
                if (!next.isClosed()) {
                    next.context().writeAndFlush(Unpooled.EMPTY_BUFFER);
                    next.context().close();
                    next.context().flush();
                }
            }
        });
    }

    static {
        $assertionsDisabled = !SharedConnectionPool.class.desiredAssertionStatus();
        log = LogManager.getLogger(SharedConnectionPool.class);
        trace = log.isTraceEnabled();
    }
}
