/*
 * Decompiled with CFR 0.152.
 */
package io.apicurio.registry.utils.tools;

import java.nio.ByteBuffer;
import java.util.Properties;
import java.util.function.Function;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Transformer {
    private static final Logger log = LoggerFactory.getLogger(Transformer.class);

    public static void main(String[] args) {
        String inputTopic;
        Properties properties = new Properties();
        for (String arg : args) {
            String[] split = arg.split("=");
            properties.put(split[0], split[1]);
        }
        String appId = properties.getProperty("application.id");
        if (appId == null) {
            properties.put("application.id", "apicurio-registry-transformer");
        }
        if ((inputTopic = properties.getProperty("input-topic")) == null) {
            throw new IllegalArgumentException("Missing input topic!");
        }
        String outputTopic = properties.getProperty("output-topic");
        if (outputTopic == null) {
            throw new IllegalArgumentException("Missing output topic!");
        }
        String fnType = properties.getProperty("type");
        if (fnType == null) {
            throw new IllegalArgumentException("Missing transformation type!");
        }
        final Type type = Type.valueOf(fnType);
        log.info(String.format("Transforming: %s --> %s [%s]", inputTopic, outputTopic, type));
        StreamsBuilder builder = new StreamsBuilder();
        KStream input = builder.stream(inputTopic, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.ByteArray()));
        input.transformValues(() -> new ValueTransformer<byte[], byte[]>(){

            public void init(ProcessorContext context) {
            }

            public byte[] transform(byte[] value) {
                return type.apply(value);
            }

            public void close() {
            }
        }, new String[0]).to(outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.ByteArray()));
        Topology topology = builder.build(properties);
        KafkaStreams streams = new KafkaStreams(topology, properties);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> ((KafkaStreams)streams).close()));
        streams.start();
    }

    static enum Type implements Function<byte[], byte[]>
    {
        CONFLUENT_TO_APICURIO(bytes -> {
            ByteBuffer input = ByteBuffer.wrap(bytes);
            ByteBuffer output = ByteBuffer.allocate(((byte[])bytes).length + 4);
            output.put(input.get());
            output.putLong(input.getInt());
            output.put(input);
            return output.array();
        }),
        APICURIO_TO_CONFLUENT(bytes -> {
            ByteBuffer input = ByteBuffer.wrap(bytes);
            ByteBuffer output = ByteBuffer.allocate(((byte[])bytes).length - 4);
            output.put(input.get());
            output.putInt((int)input.getLong());
            output.put(input);
            return output.array();
        });

        private Function<byte[], byte[]> fn;

        private Type(Function<byte[], byte[]> fn) {
            this.fn = fn;
        }

        @Override
        public byte[] apply(byte[] bytes) {
            return this.fn.apply(bytes);
        }
    }
}

