package fr.jetoile.hadoopunit.test.kafka;

import fr.jetoile.hadoopunit.exception.ConfigException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:fr/jetoile/hadoopunit/test/kafka/KafkaConsumerUtils.class */
public enum KafkaConsumerUtils {
    INSTANCE;

    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerUtils.class);
    private static final String KAFKA_HOSTNAME_KEY = "kafka.hostname";
    private static final String KAFKA_PORT_KEY = "kafka.port";
    private String kafkaHostname;
    private Integer kafkaPort;
    private Configuration configuration;
    long numRead = 0;

    KafkaConsumerUtils() {
        try {
            loadConfig();
        } catch (ConfigException e) {
            System.exit(-1);
        }
    }

    public void consumeMessagesWithOldApi(String str, int i) throws UnsupportedEncodingException {
        SimpleConsumer simpleConsumer = new SimpleConsumer(this.kafkaHostname, this.kafkaPort.intValue(), 30000, 2, "test");
        System.out.println("Testing single fetch");
        FetchRequest build = new FetchRequestBuilder().clientId("test").addFetch(str, 0, 0L, 100).build();
        while (this.numRead != i) {
            printMessages(simpleConsumer.fetch(build).messageSet(str, 0));
            this.numRead++;
        }
    }

    public void consumeMessagesWithNewApi(String str, int i) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.kafkaHostname + ":" + this.kafkaPort);
        properties.put("group.id", "test");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Arrays.asList(str));
        while (this.numRead != i) {
            try {
                Iterator it = kafkaConsumer.poll(1000L).iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    this.numRead++;
                    LOG.debug((String) consumerRecord.value());
                }
            } finally {
                kafkaConsumer.close();
            }
        }
    }

    public long getNumRead() {
        return this.numRead;
    }

    private void printMessages(ByteBufferMessageSet byteBufferMessageSet) throws UnsupportedEncodingException {
        Iterator it = byteBufferMessageSet.iterator();
        while (it.hasNext()) {
            ByteBuffer payload = ((MessageAndOffset) it.next()).message().payload();
            byte[] bArr = new byte[payload.limit()];
            payload.get(bArr);
            LOG.debug(new String(bArr, "UTF-8"));
        }
    }

    private void loadConfig() throws ConfigException {
        try {
            this.configuration = new PropertiesConfiguration("hadoop-unit-default.properties");
            this.kafkaHostname = this.configuration.getString(KAFKA_HOSTNAME_KEY);
            this.kafkaPort = Integer.valueOf(this.configuration.getInt(KAFKA_PORT_KEY));
        } catch (ConfigurationException e) {
            throw new ConfigException("bad config", e);
        }
    }
}
