/*
 * Decompiled with CFR 0.152.
 */
package io.kroxylicious.testing.kafka.invm;

import io.kroxylicious.testing.kafka.api.KafkaCluster;
import io.kroxylicious.testing.kafka.api.TerminationStyle;
import io.kroxylicious.testing.kafka.common.KafkaClusterConfig;
import io.kroxylicious.testing.kafka.common.PortAllocator;
import io.kroxylicious.testing.kafka.common.Utils;
import io.kroxylicious.testing.kafka.invm.LoggingPrintStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.io.UncheckedIOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Modifier;
import java.net.InetSocketAddress;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.IntPredicate;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;
import kafka.server.KafkaServer;
import kafka.server.MetaProperties;
import kafka.server.Server;
import kafka.tools.StorageTool;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.jetbrains.annotations.NotNull;
import scala.Option;
import scala.collection.immutable.Seq;

public class InVMKafkaCluster
implements KafkaCluster,
KafkaClusterConfig.KafkaEndpoints {
    private static final System.Logger LOGGER = System.getLogger(InVMKafkaCluster.class.getName());
    private static final PrintStream LOGGING_PRINT_STREAM = LoggingPrintStream.loggingPrintStream(LOGGER, System.Logger.Level.DEBUG);
    private static final int STARTUP_TIMEOUT = 30;
    private final KafkaClusterConfig clusterConfig;
    private final Path tempDirectory;
    private ZooKeeperServer zooServer;
    private final Map<Integer, Server> servers = new HashMap<Integer, Server>();
    private final Set<Integer> stoppedServers = new HashSet<Integer>();
    private final PortAllocator portsAllocator = new PortAllocator();

    public InVMKafkaCluster(KafkaClusterConfig clusterConfig) {
        this.clusterConfig = clusterConfig;
        try {
            this.tempDirectory = Files.createTempDirectory("kafka", new FileAttribute[0]);
            this.tempDirectory.toFile().deleteOnExit();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        InVMKafkaCluster.trapKafkaSystemExit();
    }

    private static void exitHandler(int statusCode, String message) {
        IllegalStateException illegalStateException = new IllegalStateException(message);
        LOGGER.log(System.Logger.Level.WARNING, "Kafka tried to exit with statusCode: {0} and message: {1}. Including stacktrace to determine whats at fault", statusCode, message, illegalStateException);
    }

    @NotNull
    private Server buildKafkaServer(KafkaClusterConfig.ConfigHolder c) {
        KafkaConfig config = this.buildBrokerConfig(c);
        Option threadNamePrefix = Option.apply(null);
        boolean kraftMode = this.clusterConfig.isKraftMode();
        if (kraftMode) {
            String clusterId = c.getKafkaKraftClusterId();
            Seq directories = StorageTool.configToLogDirectories((KafkaConfig)config);
            MetaProperties metaProperties = StorageTool.buildMetadataProperties((String)clusterId, (KafkaConfig)config);
            StorageTool.formatCommand((PrintStream)LOGGING_PRINT_STREAM, (Seq)directories, (MetaProperties)metaProperties, (MetadataVersion)MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, (boolean)true);
            return this.instantiateKraftServer(config, (Option<String>)threadNamePrefix);
        }
        return new KafkaServer(config, Time.SYSTEM, threadNamePrefix, false);
    }

    @NotNull
    private Server instantiateKraftServer(KafkaConfig config, Option<String> threadNamePrefix) {
        Object kraftServer = this.construct(KafkaRaftServer.class, config, Time.SYSTEM).orElseGet(() -> this.construct(KafkaRaftServer.class, config, Time.SYSTEM, threadNamePrefix).orElseThrow());
        return (Server)kraftServer;
    }

    public Optional<Object> construct(Class<?> clazz, Object ... parameters) {
        Constructor<?>[] declaredConstructors = clazz.getDeclaredConstructors();
        return Arrays.stream(declaredConstructors).filter(constructor -> Modifier.isPublic(constructor.getModifiers())).filter(constructor -> {
            if (constructor.getParameterCount() != parameters.length) {
                return false;
            }
            boolean allMatch = true;
            Class<?>[] parameterTypes = constructor.getParameterTypes();
            for (int i = 0; i < parameters.length; ++i) {
                allMatch = allMatch && parameterTypes[i].isInstance(parameters[i]);
            }
            return allMatch;
        }).findFirst().map(constructor -> {
            try {
                return constructor.newInstance(parameters);
            }
            catch (IllegalAccessException | InstantiationException | InvocationTargetException e) {
                throw new RuntimeException(e);
            }
        });
    }

    @NotNull
    private KafkaConfig buildBrokerConfig(KafkaClusterConfig.ConfigHolder c) {
        Properties properties = new Properties();
        properties.putAll((Map<?, ?>)c.getProperties());
        Path logsDir = this.getBrokerLogDir(c.getBrokerNum());
        properties.setProperty(KafkaConfig.LogDirProp(), logsDir.toAbsolutePath().toString());
        LOGGER.log(System.Logger.Level.DEBUG, "Generated config {0}", properties);
        return new KafkaConfig((Map)properties);
    }

    @NotNull
    private Path getBrokerLogDir(int brokerNum) {
        return this.tempDirectory.resolve(String.format("broker-%d", brokerNum));
    }

    public synchronized void start() {
        try (PortAllocator.PortAllocationSession portAllocationSession = this.portsAllocator.allocationSession();){
            portAllocationSession.allocate(Set.of(KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL, KafkaClusterConfig.KafkaEndpoints.Listener.ANON, KafkaClusterConfig.KafkaEndpoints.Listener.INTERNAL), 0, this.clusterConfig.getBrokersNum());
            portAllocationSession.allocate(Set.of(KafkaClusterConfig.KafkaEndpoints.Listener.CONTROLLER), 0, this.clusterConfig.isKraftMode() ? this.clusterConfig.getKraftControllers() : 1);
        }
        this.buildAndStartZookeeper();
        ((Stream)this.clusterConfig.getBrokerConfigs(() -> this).parallel()).forEach(configHolder -> {
            Server server = this.buildKafkaServer((KafkaClusterConfig.ConfigHolder)configHolder);
            this.tryToStartServerWithRetry((KafkaClusterConfig.ConfigHolder)configHolder, server);
            this.servers.put(configHolder.getBrokerNum(), server);
        });
        Utils.awaitExpectedBrokerCountInClusterViaTopic(this.clusterConfig.getAnonConnectConfigForCluster(this.buildBrokerList(nodeId -> this.getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, (int)nodeId))), 120, TimeUnit.SECONDS, this.clusterConfig.getBrokersNum());
    }

    private void tryToStartServerWithRetry(KafkaClusterConfig.ConfigHolder configHolder, Server server) {
        Utils.awaitCondition(30, TimeUnit.SECONDS).until(() -> {
            try {
                LOGGER.log(System.Logger.Level.DEBUG, "Attempting to start node: {0} with roles: {1}", configHolder.getBrokerNum(), configHolder.getProperties().get("process.roles"));
                server.startup();
                return true;
            }
            catch (Throwable t) {
                LOGGER.log(System.Logger.Level.WARNING, "failed to start server due to: " + t.getMessage());
                LOGGER.log(System.Logger.Level.WARNING, "anon: {0}, client: {1}, controller: {2}, interBroker: {3}, ", this.getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, configHolder.getBrokerNum()).getBind(), this.getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL, configHolder.getBrokerNum()).getBind(), this.getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.CONTROLLER, configHolder.getBrokerNum()).getBind(), this.getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL, configHolder.getBrokerNum()).getBind());
                server.shutdown();
                server.awaitShutdown();
                return false;
            }
        });
    }

    private void buildAndStartZookeeper() {
        if (!this.clusterConfig.isKraftMode()) {
            try {
                int zookeeperPort = this.portsAllocator.getPort(KafkaClusterConfig.KafkaEndpoints.Listener.CONTROLLER, 0);
                ServerCnxnFactory zooFactory = ServerCnxnFactory.createFactory((InetSocketAddress)new InetSocketAddress("localhost", zookeeperPort), (int)1024);
                Path zoo = this.tempDirectory.resolve("zoo");
                Path snapshotDir = zoo.resolve("snapshot");
                Path logDir = zoo.resolve("log");
                Files.createDirectories(snapshotDir, new FileAttribute[0]);
                Files.createDirectories(logDir, new FileAttribute[0]);
                this.zooServer = new ZooKeeperServer(snapshotDir.toFile(), logDir.toFile(), 500);
                zooFactory.startup(this.zooServer);
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    }

    public String getClusterId() {
        return this.clusterConfig.clusterId();
    }

    public synchronized String getBootstrapServers() {
        return this.buildBrokerList(nodeId -> this.getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL, (int)nodeId));
    }

    private synchronized String buildBrokerList(Function<Integer, KafkaClusterConfig.KafkaEndpoints.EndpointPair> endpointFunc) {
        return this.servers.keySet().stream().filter(this::isBroker).map(endpointFunc).map(KafkaClusterConfig.KafkaEndpoints.EndpointPair::connectAddress).collect(Collectors.joining(","));
    }

    public Map<String, Object> getKafkaClientConfiguration() {
        return this.clusterConfig.getConnectConfigForCluster(this.getBootstrapServers());
    }

    public Map<String, Object> getKafkaClientConfiguration(String user, String password) {
        return this.clusterConfig.getConnectConfigForCluster(this.getBootstrapServers(), user, password);
    }

    public synchronized int addBroker() {
        OptionalInt first = IntStream.rangeClosed(0, this.getNumOfBrokers()).filter(cand -> !this.servers.containsKey(cand)).findFirst();
        if (first.isEmpty()) {
            throw new IllegalStateException("Could not determine new nodeId, existing set " + this.servers.keySet());
        }
        int newNodeId = first.getAsInt();
        LOGGER.log(System.Logger.Level.DEBUG, "Adding broker with node.id {0} to cluster with existing nodes {1}.", newNodeId, this.servers.keySet());
        try (PortAllocator.PortAllocationSession portAllocationSession = this.portsAllocator.allocationSession();){
            portAllocationSession.allocate(Set.of(KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL, KafkaClusterConfig.KafkaEndpoints.Listener.ANON, KafkaClusterConfig.KafkaEndpoints.Listener.INTERNAL), newNodeId);
        }
        KafkaClusterConfig.ConfigHolder configHolder = this.clusterConfig.generateConfigForSpecificNode(this, newNodeId);
        Server server = this.buildKafkaServer(configHolder);
        this.tryToStartServerWithRetry(configHolder, server);
        this.servers.put(newNodeId, server);
        Utils.awaitExpectedBrokerCountInClusterViaTopic(this.clusterConfig.getAnonConnectConfigForCluster(this.buildBrokerList(nodeId -> this.getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, (int)nodeId))), 120, TimeUnit.SECONDS, this.getNumOfBrokers());
        return configHolder.getBrokerNum();
    }

    public synchronized void removeBroker(int nodeId) throws IllegalArgumentException, UnsupportedOperationException {
        if (!this.servers.containsKey(nodeId)) {
            throw new IllegalArgumentException("Broker node " + nodeId + " is not a member of the cluster.");
        }
        if (this.clusterConfig.isKraftMode() && this.isController(nodeId)) {
            throw new UnsupportedOperationException("Cannot remove controller node " + nodeId + " from a kraft cluster.");
        }
        if (this.servers.size() < 2) {
            throw new IllegalArgumentException("Cannot remove a node from a cluster with only %d nodes".formatted(this.servers.size()));
        }
        if (!this.stoppedServers.isEmpty()) {
            throw new IllegalStateException("Cannot remove nodes from a cluster with stopped nodes.");
        }
        Optional<Integer> target = this.servers.keySet().stream().filter(n -> n != nodeId).findFirst();
        if (target.isEmpty()) {
            throw new IllegalStateException("Could not identify a node to be the re-assignment target");
        }
        Utils.awaitReassignmentOfKafkaInternalTopicsIfNecessary(this.clusterConfig.getAnonConnectConfigForCluster(this.buildBrokerList(id -> this.getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, (int)id))), nodeId, target.get(), 120, TimeUnit.SECONDS);
        this.portsAllocator.deallocate(nodeId);
        Server s = this.servers.remove(nodeId);
        s.shutdown();
        s.awaitShutdown();
        InVMKafkaCluster.ensureDirectoryIsEmpty(this.getBrokerLogDir(nodeId));
    }

    public synchronized void stopNodes(IntPredicate nodeIdPredicate, TerminationStyle terminationStyle) {
        Map<Integer, Server> kafkaServersToStop = this.servers.entrySet().stream().filter(e -> nodeIdPredicate.test((Integer)e.getKey())).filter(e -> !this.stoppedServers.contains(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        this.roleOrderedShutdown(kafkaServersToStop);
        this.stoppedServers.addAll(kafkaServersToStop.keySet());
    }

    public synchronized void startNodes(IntPredicate nodeIdPredicate) {
        Map<Integer, Server> kafkaServersToStart = this.servers.entrySet().stream().filter(e -> nodeIdPredicate.test((Integer)e.getKey())).filter(e -> this.stoppedServers.contains(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        kafkaServersToStart.forEach((key, value) -> {
            KafkaClusterConfig.ConfigHolder configHolder = this.clusterConfig.generateConfigForSpecificNode(this, (int)key);
            Server replacement = this.buildKafkaServer(configHolder);
            this.tryToStartServerWithRetry(configHolder, replacement);
            this.servers.put((Integer)key, replacement);
            this.stoppedServers.remove(key);
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void close() throws Exception {
        try {
            try {
                this.roleOrderedShutdown(this.servers);
            }
            finally {
                if (this.zooServer != null) {
                    this.zooServer.shutdown(true);
                }
            }
        }
        finally {
            if (this.tempDirectory.toFile().exists()) {
                try (Stream<Path> ps = Files.walk(this.tempDirectory, new FileVisitOption[0]);
                     Stream<File> s = ps.sorted(Comparator.reverseOrder()).map(Path::toFile);){
                    s.forEach(File::delete);
                }
            }
        }
    }

    private void roleOrderedShutdown(Map<Integer, Server> servers) {
        this.shutdownServers(servers, e -> !this.isController((Integer)e.getKey()));
        this.shutdownServers(servers, e -> this.isController((Integer)e.getKey()));
    }

    private boolean isController(Integer key) {
        return this.portsAllocator.hasRegisteredPort(KafkaClusterConfig.KafkaEndpoints.Listener.CONTROLLER, key);
    }

    private boolean isBroker(Integer key) {
        return this.portsAllocator.hasRegisteredPort(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, key);
    }

    private void shutdownServers(Map<Integer, Server> servers, Predicate<Map.Entry<Integer, Server>> entryPredicate) {
        List<Server> matchingServers = servers.entrySet().stream().filter(entryPredicate).map(Map.Entry::getValue).peek(Server::shutdown).toList();
        matchingServers.forEach(Server::awaitShutdown);
    }

    private static void ensureDirectoryIsEmpty(Path path) {
        if (Files.exists(path, new LinkOption[0])) {
            try (Stream<Path> ps = Files.walk(path, new FileVisitOption[0]);
                 Stream<File> s = ps.sorted(Comparator.reverseOrder()).map(Path::toFile);){
                s.forEach(File::delete);
            }
            catch (IOException e) {
                throw new UncheckedIOException("Error whilst deleting " + path, e);
            }
        }
    }

    public synchronized int getNumOfBrokers() {
        return this.servers.size();
    }

    public synchronized Set<Integer> getStoppedBrokers() {
        return Set.copyOf(this.stoppedServers);
    }

    @Override
    public synchronized KafkaClusterConfig.KafkaEndpoints.EndpointPair getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener listener, int nodeId) {
        int port = this.portsAllocator.getPort(listener, nodeId);
        return KafkaClusterConfig.KafkaEndpoints.EndpointPair.builder().bind(new KafkaClusterConfig.KafkaEndpoints.Endpoint("0.0.0.0", port)).connect(new KafkaClusterConfig.KafkaEndpoints.Endpoint("localhost", port)).build();
    }

    private static void trapKafkaSystemExit() {
        Exit.setExitProcedure(InVMKafkaCluster::exitHandler);
        Exit.setHaltProcedure(InVMKafkaCluster::exitHandler);
    }
}

