package org.cg.eventbus.stream;

import kafka.message.MessageAndMetadata;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.cg.eventbus.policy.IPolicy;

/* loaded from: input_file:org/cg/eventbus/stream/Dispatcher.class */
public class Dispatcher<K, V> extends Thread {
    private static final Logger LOG = Logger.getLogger(Dispatcher.class);
    private Configuration config;
    private Fetcher fetcher;
    private IHandler handler;
    private IReporter reporter;
    private IPolicy policy;

    public Dispatcher(Configuration configuration, IHandler iHandler, IReporter iReporter, IPolicy iPolicy) throws Exception {
        this.config = configuration;
        this.handler = iHandler;
        this.reporter = iReporter;
        this.policy = iPolicy;
        init();
    }

    private void init() throws Exception {
        this.fetcher = new Fetcher(this.config);
        LOG.debug("Fetcher initialization finished.");
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (this.fetcher.hasNext()) {
            MessageAndMetadata<K, V> nextMessage = this.fetcher.nextMessage();
            LOG.debug("Coming Data, key: " + nextMessage.key() + ", value: " + nextMessage.message());
            this.handler.handle(nextMessage, this.reporter);
            if (this.policy.pieceDone()) {
                this.fetcher.commit();
            }
        }
    }

    public void close() {
        this.fetcher.close();
    }
}
