package io.kgraph.kgiraffe.server;

import graphql.GraphQL;
import io.kcache.KafkaCacheConfig;
import io.kgraph.kgiraffe.KGiraffeConfig;
import io.kgraph.kgiraffe.KGiraffeEngine;
import io.kgraph.kgiraffe.server.notifier.VertxNotifier;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.StaticHandler;
import io.vertx.ext.web.handler.graphql.ApolloWSHandler;
import io.vertx.ext.web.handler.graphql.ApolloWSOptions;
import io.vertx.ext.web.handler.graphql.GraphQLHandler;
import io.vertx.ext.web.handler.graphql.GraphQLHandlerOptions;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.jar.Attributes;
import java.util.jar.Manifest;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

@CommandLine.Command(name = "kgiraffe", mixinStandardHelpOptions = true, versionProvider = ManifestVersionProvider.class, description = {"A GraphQL Interface for Apache Kafka and Schema Registry."}, sortOptions = false, sortSynopsis = false)
/* loaded from: input_file:io/kgraph/kgiraffe/server/KGiraffeMain.class */
public class KGiraffeMain extends AbstractVerticle implements Callable<Integer> {
    private static final Logger LOG = LoggerFactory.getLogger(KGiraffeMain.class);
    private static final KGiraffeConfig.ListPropertyParser listPropertyParser = new KGiraffeConfig.ListPropertyParser();
    private static final KGiraffeConfig.MapPropertyParser mapPropertyParser = new KGiraffeConfig.MapPropertyParser();
    private KGiraffeConfig config;

    @CommandLine.Option(names = {"-t", "--topic"}, description = {"Topic(s) to consume from and produce to"}, paramLabel = "<topic>")
    private List<String> topics;

    @CommandLine.Option(names = {"-p", "--partition"}, description = {"Partition(s)"}, paramLabel = "<partition>")
    private List<Integer> partitions;

    @CommandLine.Option(names = {"-b", "--bootstrap-server"}, description = {"Bootstrap broker(s) (host:[port])"}, paramLabel = "<broker>")
    private List<String> bootstrapBrokers;

    @CommandLine.Option(names = {"-m", "--metadata-timeout"}, description = {"Metadata (et.al.) request timeout"}, paramLabel = "<ms>")
    private Integer initTimeout;

    @CommandLine.Option(names = {"-F", "--file"}, description = {"Read configuration properties from file"}, paramLabel = "<config-file>")
    private File configFile;

    @CommandLine.Option(names = {"-o", "--offset"}, description = {"Offset to start consuming from:\n  beginning | end |\n  <value>  (absolute offset) |\n  -<value> (relative offset from end)\n  @<value> (timestamp in ms to start at)\n  Default: beginning"})
    private KafkaCacheConfig.Offset offset;

    @CommandLine.Option(names = {"-k", "--key-serde"}, description = {"(De)serialize keys using <serde>"}, paramLabel = "<topic=serde>")
    private Map<String, KGiraffeConfig.Serde> keySerdes;

    @CommandLine.Option(names = {"-v", "--value-serde"}, description = {"(De)serialize values using <serde>\nAvailable serdes:\n  short | int | long | float |\n  double | string | binary |\n  avro:<schema|@file> |\n  json:<schema|@file> |\n  proto:<schema|@file> |\n  latest (use latest version in SR) |\n  <id>   (use schema id from SR)\n  Default for key:   binary\n  Default for value: latest\nThe avro/json/proto serde formats can\nalso be specified with refs, e.g.\n  avro:<schema|@file>;refs:<refs|@file>\nwhere refs are schema references\nof the form \n  [{name=\"<name>\",subject=\"<subject>\",\n    version=<version>},..]"}, paramLabel = "<topic=serde>")
    private Map<String, KGiraffeConfig.Serde> valueSerdes;

    @CommandLine.Option(names = {"-r", "--schema-registry-url"}, description = {"SR (Schema Registry) URL"}, paramLabel = "<url>")
    private String schemaRegistryUrl;

    @CommandLine.Option(names = {"-s", "--stage-schema"}, description = {"Validate and stage the given schema(s).\nSee avro/json/proto serde formats above."}, paramLabel = "<serde>")
    private List<KGiraffeConfig.Serde> schemas;

    @CommandLine.Option(names = {"-X", "--property"}, description = {"Set kgiraffe configuration property."}, paramLabel = "<prop=val>")
    private Map<String, String> properties;

    /* loaded from: input_file:io/kgraph/kgiraffe/server/KGiraffeMain$ManifestVersionProvider.class */
    static class ManifestVersionProvider implements CommandLine.IVersionProvider {
        ManifestVersionProvider() {
        }

        public String[] getVersion() throws Exception {
            Enumeration<URL> resources = CommandLine.class.getClassLoader().getResources("META-INF/MANIFEST.MF");
            while (resources.hasMoreElements()) {
                URL nextElement = resources.nextElement();
                try {
                    Manifest manifest = new Manifest(nextElement.openStream());
                    if (isApplicableManifest(manifest)) {
                        return new String[]{"kgiraffe - A GraphQL Interface for Apache Kafka and Schema Registry", "https://github.com/rayokota/kgiraffe", "Copyright (c) 2022, Robert Yokota", "Version " + get(manifest.getMainAttributes(), "Implementation-Version")};
                    }
                } catch (IOException e) {
                    return new String[]{"Unable to read from " + nextElement + ": " + e};
                }
            }
            return new String[0];
        }

        private boolean isApplicableManifest(Manifest manifest) {
            return "kgiraffe-server".equals(get(manifest.getMainAttributes(), "Implementation-Title"));
        }

        private static Object get(Attributes attributes, String str) {
            return attributes.get(new Attributes.Name(str));
        }
    }

    /* loaded from: input_file:io/kgraph/kgiraffe/server/KGiraffeMain$OffsetConverter.class */
    static class OffsetConverter implements CommandLine.ITypeConverter<KafkaCacheConfig.Offset> {
        OffsetConverter() {
        }

        /* renamed from: convert, reason: merged with bridge method [inline-methods] */
        public KafkaCacheConfig.Offset m1convert(String str) {
            try {
                return new KafkaCacheConfig.Offset(str);
            } catch (ConfigException e) {
                throw new CommandLine.TypeConversionException("expected one of [beginning, end, <value>, -<value>, @<value>] but was '" + str + "'");
            }
        }
    }

    /* loaded from: input_file:io/kgraph/kgiraffe/server/KGiraffeMain$SerdeConverter.class */
    static class SerdeConverter implements CommandLine.ITypeConverter<KGiraffeConfig.Serde> {
        SerdeConverter() {
        }

        /* renamed from: convert, reason: merged with bridge method [inline-methods] */
        public KGiraffeConfig.Serde m2convert(String str) {
            try {
                return new KGiraffeConfig.Serde(str);
            } catch (ConfigException e) {
                throw new CommandLine.TypeConversionException("expected one of [short, int, long, float, double, string, binary, latest, <id>] but was '" + str + "'");
            }
        }
    }

    public KGiraffeMain() {
    }

    public KGiraffeMain(KGiraffeConfig kGiraffeConfig) {
        this.config = kGiraffeConfig;
    }

    public URI getListener() throws URISyntaxException {
        return new URI(this.config.getString("listener"));
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Integer call() throws Exception {
        if (this.configFile != null) {
            this.config = new KGiraffeConfig(this.configFile);
        }
        this.config = updateConfig();
        Vertx vertx = Vertx.vertx();
        KGiraffeEngine kGiraffeEngine = KGiraffeEngine.getInstance();
        kGiraffeEngine.configure(this.config);
        kGiraffeEngine.init(new VertxNotifier(vertx.eventBus()));
        vertx.deployVerticle(this);
        Thread thread = new Thread(() -> {
            try {
                System.in.read();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        thread.setDaemon(true);
        thread.start();
        thread.join();
        return 0;
    }

    public void start(Promise<Void> promise) {
        KGiraffeEngine kGiraffeEngine = KGiraffeEngine.getInstance();
        try {
            Router router = Router.router(this.vertx);
            GraphQL graphQL = kGiraffeEngine.getGraphQL();
            router.route().handler(BodyHandler.create());
            GraphQLHandlerOptions requestMultipartEnabled = new GraphQLHandlerOptions().setRequestBatchingEnabled(true).setRequestMultipartEnabled(true);
            router.route("/graphql").handler(ApolloWSHandler.create(graphQL, new ApolloWSOptions().setKeepAlive(5000L))).handler(GraphQLHandler.create(graphQL, requestMultipartEnabled));
            router.route("/kgiraffe/*").handler(StaticHandler.create("kgiraffe"));
            URI listener = getListener();
            this.vertx.createHttpServer(new HttpServerOptions().addWebSocketSubProtocol("graphql-ws").setTcpKeepAlive(true)).requestHandler(router).exceptionHandler(th -> {
                LOG.error("Server error", th);
            }).listen(listener.getPort(), listener.getHost(), asyncResult -> {
                if (!asyncResult.succeeded()) {
                    LOG.info("Could not start server: ", asyncResult.cause());
                    promise.fail(asyncResult.cause());
                    LOG.error("Server died unexpectedly: ", asyncResult.cause());
                    System.exit(1);
                    return;
                }
                LOG.info("Server started, listening on {}", Integer.valueOf(listener.getPort()));
                LOG.info("GraphQL:     http://localhost:{}/graphql", Integer.valueOf(listener.getPort()));
                LOG.info("GraphQL-WS:  ws://localhost:{}/graphql", Integer.valueOf(listener.getPort()));
                LOG.info("GraphiQL:    http://localhost:{}/kgiraffe", Integer.valueOf(listener.getPort()));
                LOG.info("      /)/)  ");
                LOG.info("     ( ..\\  ");
                LOG.info("     /'-._) ");
                LOG.info("    /#/     ");
                LOG.info("   /#/      ");
                LOG.info("  /#/       ");
                LOG.info("KGiraffe is at your service...");
                promise.complete();
            });
        } catch (Exception e) {
            LOG.info("Could not start server: {}", e.getLocalizedMessage());
            promise.fail(e);
            LOG.error("Server died unexpectedly: ", e);
            System.exit(1);
        }
    }

    private KGiraffeConfig updateConfig() {
        HashMap hashMap = new HashMap();
        if (this.config != null) {
            hashMap.putAll(this.config.originalsStrings());
        }
        if (this.topics != null) {
            hashMap.put("topics", String.join(",", this.topics));
        }
        if (this.partitions != null) {
            hashMap.put("kafkacache.topic.partitions", (String) this.partitions.stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(",")));
        }
        if (this.bootstrapBrokers != null) {
            hashMap.put("kafkacache.bootstrap.servers", String.join(",", this.bootstrapBrokers));
        }
        if (this.initTimeout != null) {
            hashMap.put("kafkacache.init.timeout.ms", String.valueOf(this.initTimeout));
        }
        if (this.offset != null) {
            hashMap.put("kafkacache.topic.partitions.offset", this.offset.toString());
        }
        if (this.keySerdes != null) {
            hashMap.put("key.serdes", mapPropertyParser.asString((Map) this.keySerdes.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return ((KGiraffeConfig.Serde) entry.getValue()).toString();
            }))));
        }
        if (this.valueSerdes != null) {
            hashMap.put("value.serdes", mapPropertyParser.asString((Map) this.valueSerdes.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry2 -> {
                return ((KGiraffeConfig.Serde) entry2.getValue()).toString();
            }))));
        }
        if (this.schemaRegistryUrl != null) {
            hashMap.put("schema.registry.url", this.schemaRegistryUrl);
        }
        if (this.schemas != null) {
            hashMap.put("stage.schemas", listPropertyParser.asString((List) this.schemas.stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.toList())));
        }
        if (this.properties != null) {
            hashMap.putAll(this.properties);
        }
        return new KGiraffeConfig(hashMap);
    }

    public static void main(String[] strArr) {
        CommandLine commandLine = new CommandLine(new KGiraffeMain());
        commandLine.registerConverter(KafkaCacheConfig.Offset.class, new OffsetConverter());
        commandLine.registerConverter(KGiraffeConfig.Serde.class, new SerdeConverter());
        commandLine.setUsageHelpLongOptionsMaxWidth(30);
        System.exit(commandLine.execute(strArr));
    }
}
