package org.apache.pulsar.io.flume.sink;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractPollableSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/io/flume/sink/SourceOfFlume.class */
public class SourceOfFlume extends AbstractPollableSource implements BatchSizeSupported {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SourceOfFlume.class);
    public static final String BATCH_DURATION_MS = "batchDurationMillis";
    private long batchSize;
    private int maxBatchDurationMillis;
    private SourceCounter counter;
    private final List<Event> eventList = new ArrayList();

    @Override // org.apache.flume.source.BasicSourceSemantics
    public synchronized void doStart() {
        log.info("start source of flume ...");
        this.counter = new SourceCounter("flume-source");
        this.counter.start();
    }

    @Override // org.apache.flume.source.BasicSourceSemantics
    public void doStop() {
        log.info("stop source of flume ...");
        this.counter.stop();
    }

    @Override // org.apache.flume.source.BasicSourceSemantics
    public void doConfigure(Context context) {
        this.batchSize = context.getInteger("batchSize", 1000).intValue();
        this.maxBatchDurationMillis = context.getInteger(BATCH_DURATION_MS, 1000).intValue();
        log.info("context: {}", context);
    }

    @Override // org.apache.flume.source.AbstractPollableSource
    public PollableSource.Status doProcess() {
        try {
            long currentTimeMillis = System.currentTimeMillis() + this.maxBatchDurationMillis;
            while (this.eventList.size() < getBatchSize() && System.currentTimeMillis() < currentTimeMillis) {
                BlockingQueue<Object> queue = StringSink.getQueue();
                while (queue != null && !queue.isEmpty()) {
                    this.eventList.add(EventBuilder.withBody(queue.take().toString().getBytes()));
                }
            }
            if (this.eventList.size() <= 0) {
                return PollableSource.Status.BACKOFF;
            }
            this.counter.addToEventReceivedCount(this.eventList.size());
            getChannelProcessor().processEventBatch(this.eventList);
            this.eventList.clear();
            return PollableSource.Status.READY;
        } catch (Exception e) {
            log.error("Flume Source EXCEPTION", (Throwable) e);
            this.counter.incrementEventReadOrChannelFail(e);
            return PollableSource.Status.BACKOFF;
        }
    }

    @Override // org.apache.flume.conf.BatchSizeSupported
    public long getBatchSize() {
        return this.batchSize;
    }
}
