package io.fabric8.kubernetes.client.http;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.WebSocket;
import io.fabric8.mockwebserver.MockWebServer;
import io.fabric8.mockwebserver.MockWebServerListener;
import io.fabric8.mockwebserver.http.MockResponse;
import io.fabric8.mockwebserver.http.RecordedHttpConnection;
import io.fabric8.mockwebserver.http.Response;
import io.fabric8.mockwebserver.http.WebSocketListener;
import io.fabric8.mockwebserver.vertx.Protocol;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

/* loaded from: input_file:io/fabric8/kubernetes/client/http/AbstractSimultaneousConnectionsTest.class */
public abstract class AbstractSimultaneousConnectionsTest {
    private static final int MAX_HTTP_1_CONNECTIONS = 2048;
    private static final int MAX_HTTP_1_WS_CONNECTIONS = 1024;
    private RegisteredConnections registeredConnections;
    private MockWebServer mockWebServer;
    private ExecutorService httpExecutor;
    private HttpServer httpServer;
    private HttpClient.Builder clientBuilder;

    /* loaded from: input_file:io/fabric8/kubernetes/client/http/AbstractSimultaneousConnectionsTest$DelayedResponseHandler.class */
    private static class DelayedResponseHandler implements HttpHandler {
        private final int requestCount;
        private final CyclicBarrier barrier = new CyclicBarrier(2);
        private final Set<HttpExchange> exchanges = ConcurrentHashMap.newKeySet();
        private final CompletableFuture<Integer> connectionCount = new CompletableFuture<>();
        private final ExecutorService executorService = Executors.newFixedThreadPool(1);

        private DelayedResponseHandler(int i, HttpHandler httpHandler) {
            this.requestCount = i;
            this.connectionCount.thenRunAsync(() -> {
                Iterator<HttpExchange> it = this.exchanges.iterator();
                while (it.hasNext()) {
                    try {
                        httpHandler.handle(it.next());
                    } catch (IOException e) {
                    }
                }
            }, (Executor) this.executorService).whenComplete((r3, th) -> {
                this.executorService.shutdownNow();
            });
        }

        public void handle(HttpExchange httpExchange) {
            this.exchanges.add(httpExchange);
            await();
            if (this.exchanges.size() == this.requestCount) {
                this.connectionCount.complete(Integer.valueOf(this.requestCount));
            }
        }

        public final void await() {
            try {
                this.barrier.await(5L, TimeUnit.SECONDS);
            } catch (Exception e) {
                throw new RuntimeException("Failed to await the barrier");
            }
        }
    }

    /* loaded from: input_file:io/fabric8/kubernetes/client/http/AbstractSimultaneousConnectionsTest$RegisteredConnections.class */
    private static class RegisteredConnections implements MockWebServerListener {
        private final Set<RecordedHttpConnection> connections = ConcurrentHashMap.newKeySet();

        private RegisteredConnections() {
        }

        final int activeConnections() {
            return this.connections.size();
        }

        public void onConnection(RecordedHttpConnection recordedHttpConnection) {
            this.connections.add(recordedHttpConnection);
            super.onConnection(recordedHttpConnection);
        }

        public void onConnectionClosed(RecordedHttpConnection recordedHttpConnection) {
            this.connections.remove(recordedHttpConnection);
        }
    }

    @BeforeEach
    void prepareServerAndBuilder() throws IOException {
        this.registeredConnections = new RegisteredConnections();
        this.mockWebServer = new MockWebServer();
        this.mockWebServer.addListener(this.registeredConnections);
        this.httpExecutor = Executors.newCachedThreadPool();
        this.httpServer = HttpServer.create(new InetSocketAddress(0), 0);
        this.httpServer.setExecutor(this.httpExecutor);
        this.httpServer.start();
        this.clientBuilder = getHttpClientFactory().newBuilder().connectTimeout(60L, TimeUnit.SECONDS);
    }

    @AfterEach
    void stopServer() {
        this.mockWebServer.shutdown();
        this.httpServer.stop(0);
        this.httpExecutor.shutdownNow();
    }

    protected abstract HttpClient.Factory getHttpClientFactory();

    private void withHttp1() {
        this.mockWebServer.setProtocols(Collections.singletonList(Protocol.HTTP_1_1));
        this.mockWebServer.start();
    }

    @DisplayName("Should be able to make 2048 simultaneous HTTP/1.x connections before processing the response")
    @Test
    @DisabledOnOs({OS.WINDOWS})
    public void http1Connections() throws Exception {
        DelayedResponseHandler delayedResponseHandler = new DelayedResponseHandler(MAX_HTTP_1_CONNECTIONS, httpExchange -> {
            httpExchange.sendResponseHeaders(204, -1L);
            httpExchange.close();
        });
        this.httpServer.createContext("/http", delayedResponseHandler);
        HttpClient build = this.clientBuilder.build();
        try {
            ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
            HttpRequest build2 = build.newHttpRequestBuilder().uri(String.format("http://localhost:%s/http", Integer.valueOf(this.httpServer.getAddress().getPort()))).build();
            for (int i = 0; i < MAX_HTTP_1_CONNECTIONS; i++) {
                newKeySet.add(build.consumeBytes(build2, (list, asyncBody) -> {
                    asyncBody.consume();
                }));
                delayedResponseHandler.await();
            }
            CompletableFuture.allOf((CompletableFuture[]) newKeySet.toArray(new CompletableFuture[0])).get(60L, TimeUnit.SECONDS);
            Assertions.assertThat(newKeySet).hasSize(MAX_HTTP_1_CONNECTIONS).extracting((v0) -> {
                return v0.join();
            }).extracting(httpResponse -> {
                ((AsyncBody) httpResponse.body()).consume();
                return Integer.valueOf(httpResponse.code());
            }).containsOnly(new Integer[]{204});
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @DisplayName("Should be able to make 1024 simultaneous HTTP connections before upgrading to WebSocket")
    @Test
    @DisabledOnOs({OS.WINDOWS})
    public void http1WebSocketConnectionsBeforeUpgrade() throws Exception {
        DelayedResponseHandler delayedResponseHandler = new DelayedResponseHandler(MAX_HTTP_1_WS_CONNECTIONS, httpExchange -> {
            httpExchange.sendResponseHeaders(404, -1L);
        });
        this.httpServer.createContext("/http", delayedResponseHandler);
        HttpClient build = this.clientBuilder.build();
        for (int i = 0; i < MAX_HTTP_1_WS_CONNECTIONS; i++) {
            try {
                build.newWebSocketBuilder().uri(URI.create(String.format("http://localhost:%s/http", Integer.valueOf(this.httpServer.getAddress().getPort())))).buildAsync(new WebSocket.Listener() { // from class: io.fabric8.kubernetes.client.http.AbstractSimultaneousConnectionsTest.1
                });
                delayedResponseHandler.await();
            } catch (Throwable th) {
                if (build != null) {
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (build != null) {
            build.close();
        }
        Assertions.assertThat(delayedResponseHandler.connectionCount.get(60L, TimeUnit.SECONDS)).isEqualTo(MAX_HTTP_1_WS_CONNECTIONS);
    }

    @DisplayName("Should be able to make 1024 simultaneous upgraded WebSocket connections")
    @Test
    @DisabledOnOs({OS.WINDOWS})
    public void http1WebSocketConnections() throws Exception {
        withHttp1();
        final ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
        final ConcurrentHashMap.KeySetView newKeySet2 = ConcurrentHashMap.newKeySet();
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        final CountDownLatch countDownLatch = new CountDownLatch(MAX_HTTP_1_WS_CONNECTIONS);
        MockResponse withWebSocketUpgrade = new MockResponse().withWebSocketUpgrade(new WebSocketListener() { // from class: io.fabric8.kubernetes.client.http.AbstractSimultaneousConnectionsTest.2
            public void onOpen(io.fabric8.mockwebserver.http.WebSocket webSocket, Response response) {
                try {
                    cyclicBarrier.await(1L, TimeUnit.SECONDS);
                } catch (Exception e) {
                }
                newKeySet.add(webSocket);
                webSocket.send("go on");
            }
        });
        IntStream.range(0, MAX_HTTP_1_WS_CONNECTIONS).forEach(i -> {
            this.mockWebServer.enqueue(withWebSocketUpgrade);
        });
        try {
            HttpClient build = this.clientBuilder.build();
            for (int i2 = 0; i2 < MAX_HTTP_1_WS_CONNECTIONS; i2++) {
                try {
                    build.newWebSocketBuilder().uri(this.mockWebServer.url("/").uri()).buildAsync(new WebSocket.Listener() { // from class: io.fabric8.kubernetes.client.http.AbstractSimultaneousConnectionsTest.3
                        public void onMessage(WebSocket webSocket, String str) {
                            newKeySet2.add(webSocket);
                            countDownLatch.countDown();
                            webSocket.request();
                        }
                    });
                    cyclicBarrier.await(1L, TimeUnit.SECONDS);
                } finally {
                }
            }
            Assertions.assertThat(countDownLatch.await(60L, TimeUnit.SECONDS)).isTrue();
            Assertions.assertThat(newKeySet.size()).isEqualTo(MAX_HTTP_1_WS_CONNECTIONS).isLessThanOrEqualTo(this.registeredConnections.activeConnections());
            if (build != null) {
                build.close();
            }
        } finally {
            Iterator it = newKeySet.iterator();
            while (it.hasNext()) {
                ((io.fabric8.mockwebserver.http.WebSocket) it.next()).close(1000, "done");
            }
        }
    }
}
