package org.cg.eventbus.consumer.bytestream;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.cg.eventbus.message.Message;

/* loaded from: input_file:org/cg/eventbus/consumer/bytestream/GenericFetcher.class */
public class GenericFetcher {
    private static final Logger LOG;
    private Configuration config;
    private ConsumerConnector consumerConnector;
    private ConsumerIterator<byte[], byte[]> consumerIter;
    static final /* synthetic */ boolean $assertionsDisabled;

    public GenericFetcher(Configuration configuration) {
        this.config = configuration;
        init();
    }

    private void init() {
        this.consumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfigFactory().setConfig(this.config).build());
        String string = this.config.getString("consumer.topic");
        LOG.info(String.format("Thread get topic: %s", string));
        this.consumerIter = ((KafkaStream) this.consumerConnector.createMessageStreamsByFilter(new Whitelist(string)).get(0)).iterator();
    }

    public boolean hasNext() {
        return this.consumerIter.hasNext();
    }

    public Message read() {
        if (!$assertionsDisabled && !hasNext()) {
            throw new AssertionError();
        }
        MessageAndMetadata next = this.consumerIter.next();
        return new Message(next.topic(), next.offset(), (byte[]) next.key(), (byte[]) next.message(), next.partition());
    }

    public void commit() {
        this.consumerConnector.commitOffsets();
    }

    public void close() {
        this.consumerConnector.shutdown();
    }

    static {
        $assertionsDisabled = !GenericFetcher.class.desiredAssertionStatus();
        LOG = Logger.getLogger(GenericFetcher.class);
    }
}
