/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Properties;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.tools.ThroughputThrottler;

public class VerifiableProducer
implements AutoCloseable {
    private final ObjectMapper mapper = new ObjectMapper();
    private final String topic;
    private final Producer<String, String> producer;
    private long maxMessages = -1L;
    private long numAcked = 0L;
    private long numSent = 0L;
    private final long throughput;
    private boolean stopProducing = false;
    private final Integer valuePrefix;
    private final Integer repeatingKeys;
    private int keyCounter;
    private Long createTime;
    private final Long startTime;

    public VerifiableProducer(KafkaProducer<String, String> producer, String topic, int throughput, int maxMessages, Integer valuePrefix, Long createTime, Integer repeatingKeys) {
        this.topic = topic;
        this.throughput = throughput;
        this.maxMessages = maxMessages;
        this.producer = producer;
        this.valuePrefix = valuePrefix;
        this.createTime = createTime;
        this.startTime = System.currentTimeMillis();
        this.repeatingKeys = repeatingKeys;
    }

    private static ArgumentParser argParser() {
        ArgumentParser parser = ArgumentParsers.newArgumentParser("verifiable-producer").defaultHelp(true).description("This tool produces increasing integers to the specified topic and prints JSON metadata to stdout on each \"send\" request, making externally visible which messages have been acked and which have not.");
        parser.addArgument("--topic").action(Arguments.store()).required(true).type(String.class).metavar("TOPIC").help("Produce messages to this topic.");
        MutuallyExclusiveGroup connectionGroup = parser.addMutuallyExclusiveGroup("Connection Group").description("Group of arguments for connection to brokers").required(true);
        connectionGroup.addArgument("--bootstrap-server").action(Arguments.store()).required(false).type(String.class).metavar("HOST1:PORT1[,HOST2:PORT2[...]]").dest("bootstrapServer").help("REQUIRED: The server(s) to connect to. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
        connectionGroup.addArgument("--broker-list").action(Arguments.store()).required(false).type(String.class).metavar("HOST1:PORT1[,HOST2:PORT2[...]]").dest("brokerList").help("DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified.  Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
        parser.addArgument("--max-messages").action(Arguments.store()).required(false).setDefault((Object)-1).type(Integer.class).metavar("MAX-MESSAGES").dest("maxMessages").help("Produce this many messages. If -1, produce messages until the process is killed externally.");
        parser.addArgument("--throughput").action(Arguments.store()).required(false).setDefault((Object)-1).type(Integer.class).metavar("THROUGHPUT").help("If set >= 0, throttle maximum message throughput to *approximately* THROUGHPUT messages/sec.");
        parser.addArgument("--acks").action(Arguments.store()).required(false).setDefault((Object)-1).type(Integer.class).choices(0, 1, -1).metavar("ACKS").help("Acks required on each produced message. See Kafka docs on acks for details.");
        parser.addArgument("--producer.config").action(Arguments.store()).required(false).type(String.class).metavar("CONFIG_FILE").help("Producer config properties file.");
        parser.addArgument("--message-create-time").action(Arguments.store()).required(false).setDefault((Object)-1L).type(Long.class).metavar("CREATETIME").dest("createTime").help("Send messages with creation time starting at the arguments value, in milliseconds since epoch");
        parser.addArgument("--value-prefix").action(Arguments.store()).required(false).type(Integer.class).metavar("VALUE-PREFIX").dest("valuePrefix").help("If specified, each produced value will have this prefix with a dot separator");
        parser.addArgument("--repeating-keys").action(Arguments.store()).required(false).type(Integer.class).metavar("REPEATING-KEYS").dest("repeatingKeys").help("If specified, each produced record will have a key starting at 0 increment by 1 up to the number specified (exclusive), then the key is set to 0 again");
        return parser;
    }

    public static Properties loadProps(String filename) throws IOException {
        Properties props = new Properties();
        try (InputStream propStream = Files.newInputStream(Paths.get(filename, new String[0]), new OpenOption[0]);){
            props.load(propStream);
        }
        return props;
    }

    public static VerifiableProducer createFromArgs(ArgumentParser parser, String[] args) throws ArgumentParserException {
        Namespace res = parser.parseArgs(args);
        int maxMessages = res.getInt("maxMessages");
        String topic = res.getString("topic");
        int throughput = res.getInt("throughput");
        String configFile = res.getString("producer.config");
        Integer valuePrefix = res.getInt("valuePrefix");
        Long createTime = res.getLong("createTime");
        Integer repeatingKeys = res.getInt("repeatingKeys");
        if (createTime == -1L) {
            createTime = null;
        }
        Properties producerProps = new Properties();
        if (res.get("bootstrapServer") != null) {
            producerProps.put("bootstrap.servers", res.getString("bootstrapServer"));
        } else if (res.getString("brokerList") != null) {
            producerProps.put("bootstrap.servers", res.getString("brokerList"));
        } else {
            parser.printHelp();
            System.exit(0);
        }
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("acks", Integer.toString(res.getInt("acks")));
        producerProps.put("retries", "0");
        if (configFile != null) {
            try {
                producerProps.putAll((Map<?, ?>)VerifiableProducer.loadProps(configFile));
            }
            catch (IOException e) {
                throw new ArgumentParserException(e.getMessage(), parser);
            }
        }
        StringSerializer serializer = new StringSerializer();
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(producerProps, (Serializer<String>)serializer, (Serializer<String>)serializer);
        return new VerifiableProducer(producer, topic, throughput, maxMessages, valuePrefix, createTime, repeatingKeys);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void send(String key, String value) {
        ProducerRecord<String, String> record;
        if (this.createTime != null) {
            record = new ProducerRecord<String, String>(this.topic, null, this.createTime, key, value);
            this.createTime = this.createTime + (System.currentTimeMillis() - this.startTime);
        } else {
            record = new ProducerRecord<String, String>(this.topic, key, value);
        }
        ++this.numSent;
        try {
            this.producer.send(record, new PrintInfoCallback(key, value));
        }
        catch (Exception e) {
            PrintStream printStream = System.out;
            synchronized (printStream) {
                this.printJson(new FailedSend(key, value, this.topic, e));
            }
        }
    }

    public String getValue(long val) {
        if (this.valuePrefix != null) {
            return String.format("%d.%d", this.valuePrefix, val);
        }
        return String.format("%d", val);
    }

    public String getKey() {
        String key = null;
        if (this.repeatingKeys != null) {
            key = Integer.toString(this.keyCounter++);
            if (this.keyCounter == this.repeatingKeys) {
                this.keyCounter = 0;
            }
        }
        return key;
    }

    @Override
    public void close() {
        this.producer.close();
        this.printJson(new ShutdownComplete());
    }

    private void printJson(Object data) {
        try {
            System.out.println(this.mapper.writeValueAsString(data));
        }
        catch (JsonProcessingException e) {
            System.out.println("Bad data can't be written as json: " + e.getMessage());
        }
    }

    public void run(ThroughputThrottler throttler) {
        this.printJson(new StartupComplete());
        long maxMessages = this.maxMessages < 0L ? Long.MAX_VALUE : this.maxMessages;
        for (long i = 0L; i < maxMessages && !this.stopProducing; ++i) {
            long sendStartMs = System.currentTimeMillis();
            this.send(this.getKey(), this.getValue(i));
            if (!throttler.shouldThrottle(i, sendStartMs)) continue;
            throttler.throttle();
        }
    }

    public static void main(String[] args) {
        ArgumentParser parser = VerifiableProducer.argParser();
        if (args.length == 0) {
            parser.printHelp();
            System.exit(0);
        }
        try {
            VerifiableProducer producer = VerifiableProducer.createFromArgs(parser, args);
            long startMs = System.currentTimeMillis();
            ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                producer.stopProducing = true;
                producer.close();
                long stopMs = System.currentTimeMillis();
                double avgThroughput = 1000.0 * ((double)producer.numAcked / (double)(stopMs - startMs));
                producer.printJson(new ToolData(producer.numSent, producer.numAcked, producer.throughput, avgThroughput));
            }, "verifiable-producer-shutdown-hook"));
            producer.run(throttler);
        }
        catch (ArgumentParserException e) {
            parser.handleError(e);
            System.exit(1);
        }
    }

    private class PrintInfoCallback
    implements Callback {
        private String key;
        private String value;

        PrintInfoCallback(String key, String value) {
            this.key = key;
            this.value = value;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            PrintStream printStream = System.out;
            synchronized (printStream) {
                if (e == null) {
                    VerifiableProducer.this.numAcked++;
                    VerifiableProducer.this.printJson(new SuccessfulSend(this.key, this.value, recordMetadata));
                } else {
                    VerifiableProducer.this.printJson(new FailedSend(this.key, this.value, VerifiableProducer.this.topic, e));
                }
            }
        }
    }

    private static class ToolData
    extends ProducerEvent {
        private long sent;
        private long acked;
        private long targetThroughput;
        private double avgThroughput;

        public ToolData(long sent, long acked, long targetThroughput, double avgThroughput) {
            this.sent = sent;
            this.acked = acked;
            this.targetThroughput = targetThroughput;
            this.avgThroughput = avgThroughput;
        }

        @Override
        public String name() {
            return "tool_data";
        }

        @JsonProperty
        public long sent() {
            return this.sent;
        }

        @JsonProperty
        public long acked() {
            return this.acked;
        }

        @JsonProperty(value="target_throughput")
        public long targetThroughput() {
            return this.targetThroughput;
        }

        @JsonProperty(value="avg_throughput")
        public double avgThroughput() {
            return this.avgThroughput;
        }
    }

    private static class FailedSend
    extends ProducerEvent {
        private String topic;
        private String key;
        private String value;
        private Exception exception;

        public FailedSend(String key, String value, String topic, Exception exception) {
            assert (exception != null) : "Expected non-null exception.";
            this.key = key;
            this.value = value;
            this.topic = topic;
            this.exception = exception;
        }

        @Override
        public String name() {
            return "producer_send_error";
        }

        @JsonProperty
        public String key() {
            return this.key;
        }

        @JsonProperty
        public String value() {
            return this.value;
        }

        @JsonProperty
        public String topic() {
            return this.topic;
        }

        @JsonProperty
        public String exception() {
            return this.exception.getClass().toString();
        }

        @JsonProperty
        public String message() {
            return this.exception.getMessage();
        }
    }

    private static class SuccessfulSend
    extends ProducerEvent {
        private String key;
        private String value;
        private RecordMetadata recordMetadata;

        public SuccessfulSend(String key, String value, RecordMetadata recordMetadata) {
            assert (recordMetadata != null) : "Expected non-null recordMetadata object.";
            this.key = key;
            this.value = value;
            this.recordMetadata = recordMetadata;
        }

        @Override
        public String name() {
            return "producer_send_success";
        }

        @JsonProperty
        public String key() {
            return this.key;
        }

        @JsonProperty
        public String value() {
            return this.value;
        }

        @JsonProperty
        public String topic() {
            return this.recordMetadata.topic();
        }

        @JsonProperty
        public int partition() {
            return this.recordMetadata.partition();
        }

        @JsonProperty
        public long offset() {
            return this.recordMetadata.offset();
        }
    }

    private static class ShutdownComplete
    extends ProducerEvent {
        private ShutdownComplete() {
        }

        @Override
        public String name() {
            return "shutdown_complete";
        }
    }

    private static class StartupComplete
    extends ProducerEvent {
        private StartupComplete() {
        }

        @Override
        public String name() {
            return "startup_complete";
        }
    }

    @JsonPropertyOrder(value={"timestamp", "name"})
    private static abstract class ProducerEvent {
        private final long timestamp = System.currentTimeMillis();

        private ProducerEvent() {
        }

        @JsonProperty
        public abstract String name();

        @JsonProperty
        public long timestamp() {
            return this.timestamp;
        }
    }
}

