package io.kroxylicious.testing.kafka.invm;

import edu.umd.cs.findbugs.annotations.NonNull;
import io.kroxylicious.testing.kafka.api.KafkaCluster;
import io.kroxylicious.testing.kafka.api.TerminationStyle;
import io.kroxylicious.testing.kafka.clients.CloseableAdmin;
import io.kroxylicious.testing.kafka.common.KafkaClusterConfig;
import io.kroxylicious.testing.kafka.common.PortAllocator;
import io.kroxylicious.testing.kafka.common.Utils;
import io.kroxylicious.testing.kafka.internal.AdminSource;
import io.kroxylicious.testing.kafka.testcontainers.TestcontainersKafkaCluster;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.System;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Modifier;
import java.net.InetSocketAddress;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.IntPredicate;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;
import kafka.server.KafkaServer;
import kafka.server.Server;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import scala.Option;

/* loaded from: input_file:io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.class */
public class InVMKafkaCluster implements KafkaCluster, KafkaClusterConfig.KafkaEndpoints, AdminSource {
    static final System.Logger LOGGER = System.getLogger(InVMKafkaCluster.class.getName());
    private static final int STARTUP_TIMEOUT = 30;
    static final String INVM_KAFKA = "invm-kafka";
    private final KafkaClusterConfig clusterConfig;
    private final Path tempDirectory;
    private ZooKeeperServer zooServer;
    private final ConcurrentMap<Integer, Server> servers = new ConcurrentHashMap();
    private final Set<Integer> stoppedServers = new HashSet();
    private final PortAllocator portsAllocator = new PortAllocator();

    public InVMKafkaCluster(KafkaClusterConfig kafkaClusterConfig) {
        this.clusterConfig = kafkaClusterConfig;
        try {
            this.tempDirectory = Files.createTempDirectory("kafka", new FileAttribute[0]);
            this.tempDirectory.toFile().deleteOnExit();
            trapKafkaSystemExit();
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private static void exitHandler(int i, String str) {
        LOGGER.log(System.Logger.Level.WARNING, "Kafka tried to exit with statusCode: {0} and message: {1}. Including stacktrace to determine whats at fault", new Object[]{Integer.valueOf(i), str, new IllegalStateException(str)});
    }

    @NonNull
    private Server buildKafkaServer(KafkaClusterConfig.ConfigHolder configHolder, List<UserScramCredentialRecord> list) {
        KafkaConfig buildBrokerConfig = buildBrokerConfig(configHolder);
        Option<String> apply = Option.apply((Object) null);
        if (this.clusterConfig.isKraftMode()) {
            KraftLogDirUtil.prepareLogDirsForKraft(configHolder.getKafkaKraftClusterId(), buildBrokerConfig, list);
            return instantiateKraftServer(buildBrokerConfig, apply);
        }
        createScramUsersInZookeeper(list, buildBrokerConfig);
        return new KafkaServer(buildBrokerConfig, Time.SYSTEM, apply, false);
    }

    @NonNull
    private Server instantiateKraftServer(KafkaConfig kafkaConfig, Option<String> option) {
        return (Server) construct(KafkaRaftServer.class, kafkaConfig, Time.SYSTEM).orElseGet(() -> {
            return construct(KafkaRaftServer.class, kafkaConfig, Time.SYSTEM, option).orElseThrow();
        });
    }

    public Optional<Object> construct(Class<?> cls, Object... objArr) {
        return Arrays.stream(cls.getDeclaredConstructors()).filter(constructor -> {
            return Modifier.isPublic(constructor.getModifiers());
        }).filter(constructor2 -> {
            if (constructor2.getParameterCount() != objArr.length) {
                return false;
            }
            boolean z = true;
            Class<?>[] parameterTypes = constructor2.getParameterTypes();
            for (int i = 0; i < objArr.length; i++) {
                z = z && parameterTypes[i].isInstance(objArr[i]);
            }
            return z;
        }).findFirst().map(constructor3 -> {
            try {
                return constructor3.newInstance(objArr);
            } catch (IllegalAccessException | InstantiationException | InvocationTargetException e) {
                throw new RuntimeException(e);
            }
        });
    }

    @NonNull
    private KafkaConfig buildBrokerConfig(KafkaClusterConfig.ConfigHolder configHolder) {
        Properties properties = new Properties();
        properties.putAll(configHolder.getProperties());
        properties.setProperty("log.dir", getBrokerLogDir(configHolder.getBrokerNum()).toAbsolutePath().toString());
        LOGGER.log(System.Logger.Level.DEBUG, "Generated config {0}", new Object[]{properties});
        return new KafkaConfig(properties);
    }

    @NonNull
    private Path getBrokerLogDir(int i) {
        return this.tempDirectory.resolve(String.format("broker-%d", Integer.valueOf(i)));
    }

    public synchronized void start() {
        PortAllocator.PortAllocationSession allocationSession = this.portsAllocator.allocationSession();
        try {
            allocationSession.allocate(Set.of(KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL, KafkaClusterConfig.KafkaEndpoints.Listener.ANON, KafkaClusterConfig.KafkaEndpoints.Listener.INTERNAL), 0, this.clusterConfig.getBrokersNum().intValue());
            allocationSession.allocate(Set.of(KafkaClusterConfig.KafkaEndpoints.Listener.CONTROLLER), 0, this.clusterConfig.isKraftMode() ? this.clusterConfig.getKraftControllers().intValue() : 1);
            if (allocationSession != null) {
                allocationSession.close();
            }
            List<UserScramCredentialRecord> userScramCredentialRecords = getUserScramCredentialRecords();
            buildAndStartZookeeper();
            ((Stream) this.clusterConfig.getBrokerConfigs(() -> {
                return this;
            }).parallel()).forEach(configHolder -> {
                Server buildKafkaServer = buildKafkaServer(configHolder, userScramCredentialRecords);
                tryToStartServerWithRetry(configHolder, buildKafkaServer);
                this.servers.put(Integer.valueOf(configHolder.getBrokerNum()), buildKafkaServer);
            });
            Utils.awaitExpectedBrokerCountInClusterViaTopic(this.clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(num -> {
                return getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, num.intValue());
            })), 120, TimeUnit.SECONDS, this.clusterConfig.getBrokersNum());
        } catch (Throwable th) {
            if (allocationSession != null) {
                try {
                    allocationSession.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void tryToStartServerWithRetry(KafkaClusterConfig.ConfigHolder configHolder, Server server) {
        Utils.awaitCondition(STARTUP_TIMEOUT, TimeUnit.SECONDS).until(() -> {
            try {
                LOGGER.log(System.Logger.Level.DEBUG, "Attempting to start node: {0} with roles: {1}", new Object[]{Integer.valueOf(configHolder.getBrokerNum()), configHolder.getProperties().get("process.roles")});
                server.startup();
                return true;
            } catch (Throwable th) {
                LOGGER.log(System.Logger.Level.WARNING, "failed to start server due to: " + th.getMessage());
                LOGGER.log(System.Logger.Level.WARNING, "anon: {0}, client: {1}, controller: {2}, interBroker: {3}, ", new Object[]{getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, configHolder.getBrokerNum()).getBind(), getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL, configHolder.getBrokerNum()).getBind(), getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.CONTROLLER, configHolder.getBrokerNum()).getBind(), getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL, configHolder.getBrokerNum()).getBind()});
                server.shutdown();
                server.awaitShutdown();
                return false;
            }
        });
    }

    private void buildAndStartZookeeper() {
        if (this.clusterConfig.isKraftMode()) {
            return;
        }
        try {
            ServerCnxnFactory createFactory = ServerCnxnFactory.createFactory(new InetSocketAddress("localhost", this.portsAllocator.getPort(KafkaClusterConfig.KafkaEndpoints.Listener.CONTROLLER, 0)), 1024);
            Path resolve = this.tempDirectory.resolve("zoo");
            Path resolve2 = resolve.resolve("snapshot");
            Path resolve3 = resolve.resolve("log");
            Files.createDirectories(resolve2, new FileAttribute[0]);
            Files.createDirectories(resolve3, new FileAttribute[0]);
            this.zooServer = new ZooKeeperServer(resolve2.toFile(), resolve3.toFile(), 500);
            createFactory.startup(this.zooServer);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e2);
        }
    }

    public String getClusterId() {
        return this.clusterConfig.clusterId();
    }

    public synchronized String getBootstrapServers() {
        return buildBrokerList(num -> {
            return getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL, num.intValue());
        });
    }

    private synchronized String buildBrokerList(Function<Integer, KafkaClusterConfig.KafkaEndpoints.EndpointPair> function) {
        return (String) this.servers.keySet().stream().filter(this::isBroker).map(function).map((v0) -> {
            return v0.connectAddress();
        }).collect(Collectors.joining(","));
    }

    public Map<String, Object> getKafkaClientConfiguration() {
        return this.clusterConfig.getConnectConfigForCluster(getBootstrapServers());
    }

    public Map<String, Object> getKafkaClientConfiguration(String str, String str2) {
        return this.clusterConfig.getConnectConfigForCluster(getBootstrapServers(), str, str2);
    }

    public synchronized int addBroker() {
        OptionalInt findFirst = IntStream.rangeClosed(0, getNumOfBrokers()).filter(i -> {
            return !this.servers.containsKey(Integer.valueOf(i));
        }).findFirst();
        if (findFirst.isEmpty()) {
            throw new IllegalStateException("Could not determine new nodeId, existing set " + this.servers.keySet());
        }
        int asInt = findFirst.getAsInt();
        LOGGER.log(System.Logger.Level.DEBUG, "Adding broker with node.id {0} to cluster with existing nodes {1}.", new Object[]{Integer.valueOf(asInt), this.servers.keySet()});
        PortAllocator.PortAllocationSession allocationSession = this.portsAllocator.allocationSession();
        try {
            allocationSession.allocate(Set.of(KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL, KafkaClusterConfig.KafkaEndpoints.Listener.ANON, KafkaClusterConfig.KafkaEndpoints.Listener.INTERNAL), asInt);
            if (allocationSession != null) {
                allocationSession.close();
            }
            KafkaClusterConfig.ConfigHolder generateConfigForSpecificNode = this.clusterConfig.generateConfigForSpecificNode(this, asInt);
            Server buildKafkaServer = buildKafkaServer(generateConfigForSpecificNode, List.of());
            tryToStartServerWithRetry(generateConfigForSpecificNode, buildKafkaServer);
            this.servers.put(Integer.valueOf(asInt), buildKafkaServer);
            Utils.awaitExpectedBrokerCountInClusterViaTopic(this.clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(num -> {
                return getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, num.intValue());
            })), 120, TimeUnit.SECONDS, Integer.valueOf(getNumOfBrokers()));
            return generateConfigForSpecificNode.getBrokerNum();
        } catch (Throwable th) {
            if (allocationSession != null) {
                try {
                    allocationSession.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public synchronized void removeBroker(int i) throws IllegalArgumentException, UnsupportedOperationException {
        if (!this.servers.containsKey(Integer.valueOf(i))) {
            throw new IllegalArgumentException("Broker node " + i + " is not a member of the cluster.");
        }
        if (this.clusterConfig.isKraftMode() && isController(Integer.valueOf(i))) {
            throw new UnsupportedOperationException("Cannot remove controller node " + i + " from a kraft cluster.");
        }
        if (this.servers.size() < 2) {
            throw new IllegalArgumentException("Cannot remove a node from a cluster with only %d nodes".formatted(Integer.valueOf(this.servers.size())));
        }
        if (!this.stoppedServers.isEmpty()) {
            throw new IllegalStateException("Cannot remove nodes from a cluster with stopped nodes.");
        }
        Optional<Integer> findFirst = this.servers.keySet().stream().filter(num -> {
            return num.intValue() != i;
        }).findFirst();
        if (findFirst.isEmpty()) {
            throw new IllegalStateException("Could not identify a node to be the re-assignment target");
        }
        Utils.awaitReassignmentOfKafkaInternalTopicsIfNecessary(this.clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(num2 -> {
            return getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, num2.intValue());
        })), i, findFirst.get().intValue(), 120, TimeUnit.SECONDS);
        this.portsAllocator.deallocate(i);
        Server remove = this.servers.remove(Integer.valueOf(i));
        remove.shutdown();
        remove.awaitShutdown();
        ensureDirectoryIsEmpty(getBrokerLogDir(i));
    }

    public synchronized void stopNodes(IntPredicate intPredicate, TerminationStyle terminationStyle) {
        Map<Integer, Server> map = (Map) this.servers.entrySet().stream().filter(entry -> {
            return intPredicate.test(((Integer) entry.getKey()).intValue());
        }).filter(entry2 -> {
            return !this.stoppedServers.contains(entry2.getKey());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        roleOrderedShutdown(map);
        this.stoppedServers.addAll(map.keySet());
    }

    public synchronized void startNodes(IntPredicate intPredicate) {
        ((Map) this.servers.entrySet().stream().filter(entry -> {
            return intPredicate.test(((Integer) entry.getKey()).intValue());
        }).filter(entry2 -> {
            return this.stoppedServers.contains(entry2.getKey());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }))).forEach((num, server) -> {
            KafkaClusterConfig.ConfigHolder generateConfigForSpecificNode = this.clusterConfig.generateConfigForSpecificNode(this, num.intValue());
            Server buildKafkaServer = buildKafkaServer(generateConfigForSpecificNode, List.of());
            tryToStartServerWithRetry(generateConfigForSpecificNode, buildKafkaServer);
            this.servers.put(num, buildKafkaServer);
            this.stoppedServers.remove(num);
        });
    }

    /* JADX WARN: Finally extract failed */
    public synchronized void close() throws Exception {
        Stream<Path> walk;
        Stream<R> map;
        try {
            try {
                roleOrderedShutdown(this.servers);
                if (this.zooServer != null) {
                    this.zooServer.shutdown(true);
                }
                if (this.tempDirectory.toFile().exists()) {
                    walk = Files.walk(this.tempDirectory, new FileVisitOption[0]);
                    try {
                        map = walk.sorted(Comparator.reverseOrder()).map((v0) -> {
                            return v0.toFile();
                        });
                        try {
                            map.forEach((v0) -> {
                                v0.delete();
                            });
                            if (map != 0) {
                                map.close();
                            }
                            if (walk != null) {
                                walk.close();
                            }
                        } finally {
                        }
                    } finally {
                    }
                }
            } catch (Throwable th) {
                if (this.tempDirectory.toFile().exists()) {
                    walk = Files.walk(this.tempDirectory, new FileVisitOption[0]);
                    try {
                        map = walk.sorted(Comparator.reverseOrder()).map((v0) -> {
                            return v0.toFile();
                        });
                        try {
                            map.forEach((v0) -> {
                                v0.delete();
                            });
                            if (map != 0) {
                                map.close();
                            }
                            if (walk != null) {
                                walk.close();
                            }
                        } finally {
                            if (map != 0) {
                                try {
                                    map.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                        }
                    } finally {
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (this.zooServer != null) {
                this.zooServer.shutdown(true);
            }
            throw th3;
        }
    }

    private void roleOrderedShutdown(Map<Integer, Server> map) {
        shutdownServers(map, entry -> {
            return !isController((Integer) entry.getKey());
        });
        shutdownServers(map, entry2 -> {
            return isController((Integer) entry2.getKey());
        });
    }

    private boolean isController(Integer num) {
        return this.portsAllocator.hasRegisteredPort(KafkaClusterConfig.KafkaEndpoints.Listener.CONTROLLER, num.intValue());
    }

    private boolean isBroker(Integer num) {
        return this.portsAllocator.hasRegisteredPort(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, num.intValue());
    }

    private void shutdownServers(Map<Integer, Server> map, Predicate<Map.Entry<Integer, Server>> predicate) {
        map.entrySet().stream().filter(predicate).map((v0) -> {
            return v0.getValue();
        }).peek((v0) -> {
            v0.shutdown();
        }).toList().forEach((v0) -> {
            v0.awaitShutdown();
        });
    }

    private static void ensureDirectoryIsEmpty(Path path) {
        if (Files.exists(path, new LinkOption[0])) {
            try {
                Stream<Path> walk = Files.walk(path, new FileVisitOption[0]);
                try {
                    Stream<R> map = walk.sorted(Comparator.reverseOrder()).map((v0) -> {
                        return v0.toFile();
                    });
                    try {
                        map.forEach((v0) -> {
                            v0.delete();
                        });
                        if (map != 0) {
                            map.close();
                        }
                        if (walk != null) {
                            walk.close();
                        }
                    } catch (Throwable th) {
                        if (map != 0) {
                            try {
                                map.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new UncheckedIOException("Error whilst deleting " + path, e);
            }
        }
    }

    public synchronized int getNumOfBrokers() {
        return this.servers.size();
    }

    public synchronized Set<Integer> getStoppedBrokers() {
        return Set.copyOf(this.stoppedServers);
    }

    @Override // io.kroxylicious.testing.kafka.common.KafkaClusterConfig.KafkaEndpoints
    public synchronized KafkaClusterConfig.KafkaEndpoints.EndpointPair getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener listener, int i) {
        int port = this.portsAllocator.getPort(listener, i);
        return KafkaClusterConfig.KafkaEndpoints.EndpointPair.builder().bind(new KafkaClusterConfig.KafkaEndpoints.Endpoint(TestcontainersKafkaCluster.WILDCARD_BIND_ADDRESS, port)).connect(new KafkaClusterConfig.KafkaEndpoints.Endpoint("localhost", port)).build();
    }

    private static void trapKafkaSystemExit() {
        Exit.setExitProcedure(InVMKafkaCluster::exitHandler);
        Exit.setHaltProcedure(InVMKafkaCluster::exitHandler);
    }

    @Override // io.kroxylicious.testing.kafka.internal.AdminSource
    @NonNull
    public Admin createAdmin() {
        return CloseableAdmin.create(this.clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(num -> {
            return getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, num.intValue());
        })));
    }

    @NonNull
    private List<UserScramCredentialRecord> getUserScramCredentialRecords() {
        return (!this.clusterConfig.isSaslScram() || this.clusterConfig.getUsers() == null || this.clusterConfig.getUsers().isEmpty()) ? List.of() : ScramUtils.getScramCredentialRecords(this.clusterConfig.getSaslMechanism(), this.clusterConfig.getUsers());
    }

    private void createScramUsersInZookeeper(List<UserScramCredentialRecord> list, KafkaConfig kafkaConfig) {
        if (list.isEmpty()) {
            return;
        }
        KafkaZkClient createZkClient = KafkaZkClient.createZkClient(INVM_KAFKA, Time.SYSTEM, kafkaConfig, KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, false));
        try {
            AdminZkClient adminZkClient = new AdminZkClient(createZkClient, Option.empty());
            String str = "users";
            disableControllerCheck(createZkClient);
            list.forEach(userScramCredentialRecord -> {
                Properties fetchEntityConfig = adminZkClient.fetchEntityConfig(str, userScramCredentialRecord.name());
                fetchEntityConfig.setProperty(ScramMechanism.fromType(userScramCredentialRecord.mechanism()).mechanismName(), ScramCredentialUtils.credentialToString(ScramUtils.asScramCredential(userScramCredentialRecord)));
                adminZkClient.changeConfigs(str, userScramCredentialRecord.name(), fetchEntityConfig, false);
            });
            if (createZkClient != null) {
                createZkClient.close();
            }
        } catch (Throwable th) {
            if (createZkClient != null) {
                try {
                    createZkClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static void disableControllerCheck(KafkaZkClient kafkaZkClient) {
        try {
            Field declaredField = kafkaZkClient.getClass().getDeclaredField("enableEntityConfigControllerCheck");
            declaredField.setAccessible(true);
            declaredField.setBoolean(kafkaZkClient, false);
        } catch (IllegalAccessException | IllegalArgumentException | SecurityException e) {
            LOGGER.log(System.Logger.Level.WARNING, "Failed to make enableEntityConfigControllerCheck accessible on %s so we are unlikely to be able to create SCRAM users", new Object[]{kafkaZkClient.getClass()});
        } catch (NoSuchFieldException e2) {
        }
    }
}
