package org.cg.eventbus.consumer.bytestream;

import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.cg.eventbus.message.MessageHandler;
import org.cg.eventbus.policy.CommitPolicy;

/* loaded from: input_file:org/cg/eventbus/consumer/bytestream/Dispatcher.class */
public abstract class Dispatcher extends Thread {
    protected static Logger LOG = Logger.getLogger(Dispatcher.class);
    protected Configuration config;
    protected Configuration handlerConfig;
    protected Configuration parserConfig;
    protected GenericFetcher fetcher;
    protected CommitPolicy policy;
    protected boolean commit;
    protected MessageHandler handler;

    public Dispatcher(Configuration configuration, Configuration configuration2, Configuration configuration3, CommitPolicy commitPolicy) {
        this.config = configuration;
        this.handlerConfig = configuration2;
        this.parserConfig = configuration3;
        this.policy = commitPolicy;
    }

    private void init() {
        this.fetcher = new GenericFetcher(this.config);
        initHandler();
    }

    public abstract void initHandler();

    public abstract boolean poll();

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            init();
            while (poll()) {
                if (this.commit) {
                    this.fetcher.commit();
                }
            }
        } catch (Exception e) {
            LOG.error("consumer initialization failed. ");
            e.printStackTrace();
            throw new RuntimeException("Failed to initialize the consumer" + e);
        }
    }
}
