package org.apache.flume.source;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.client.avro.SpoolingFileLineReader;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/SpoolDirectorySource.class */
public class SpoolDirectorySource extends AbstractSource implements Configurable, EventDrivenSource {
    private static final Logger logger = LoggerFactory.getLogger(SpoolDirectorySource.class);
    private static int POLL_DELAY_MS = 500;
    private String completedSuffix;
    private String spoolDirectory;
    private boolean fileHeader;
    private String fileHeaderKey;
    private int batchSize;
    private int bufferMaxLines;
    private int bufferMaxLineLength;
    private ScheduledExecutorService executor;
    private CounterGroup counterGroup;
    private Runnable runner;
    SpoolingFileLineReader reader;

    /* loaded from: input_file:org/apache/flume/source/SpoolDirectorySource$SpoolDirectoryRunnable.class */
    private class SpoolDirectoryRunnable implements Runnable {
        private SpoolingFileLineReader reader;
        private CounterGroup counterGroup;

        public SpoolDirectoryRunnable(SpoolingFileLineReader spoolingFileLineReader, CounterGroup counterGroup) {
            this.reader = spoolingFileLineReader;
            this.counterGroup = counterGroup;
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z;
            Error error;
            while (true) {
                try {
                    List<String> readLines = this.reader.readLines(SpoolDirectorySource.this.batchSize);
                    if (readLines.size() == 0) {
                        return;
                    }
                    String lastFileRead = this.reader.getLastFileRead();
                    ArrayList newArrayList = Lists.newArrayList();
                    for (String str : readLines) {
                        this.counterGroup.incrementAndGet("spooler.lines.read");
                        newArrayList.add(SpoolDirectorySource.this.createEvent(str, lastFileRead));
                    }
                    SpoolDirectorySource.this.getChannelProcessor().processEventBatch(newArrayList);
                    this.reader.commit();
                } finally {
                    if (z) {
                    }
                }
            }
        }
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        logger.info("SpoolDirectorySource source starting with directory:{}", this.spoolDirectory);
        this.executor = Executors.newSingleThreadScheduledExecutor();
        this.counterGroup = new CounterGroup();
        this.reader = new SpoolingFileLineReader(new File(this.spoolDirectory), this.completedSuffix, this.bufferMaxLines, this.bufferMaxLineLength);
        this.runner = new SpoolDirectoryRunnable(this.reader, this.counterGroup);
        this.executor.scheduleWithFixedDelay(this.runner, 0L, POLL_DELAY_MS, TimeUnit.MILLISECONDS);
        super.start();
        logger.debug("SpoolDirectorySource source started");
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        super.stop();
    }

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        this.spoolDirectory = context.getString(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY);
        Preconditions.checkState(this.spoolDirectory != null, "Configuration must specify a spooling directory");
        this.completedSuffix = context.getString(SpoolDirectorySourceConfigurationConstants.SPOOLED_FILE_SUFFIX, SpoolDirectorySourceConfigurationConstants.DEFAULT_SPOOLED_FILE_SUFFIX);
        this.fileHeader = context.getBoolean(SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER, false).booleanValue();
        this.fileHeaderKey = context.getString(SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER_KEY, SpoolDirectorySourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY);
        this.batchSize = context.getInteger("batchSize", 10).intValue();
        this.bufferMaxLines = context.getInteger(SpoolDirectorySourceConfigurationConstants.BUFFER_MAX_LINES, 100).intValue();
        this.bufferMaxLineLength = context.getInteger(SpoolDirectorySourceConfigurationConstants.BUFFER_MAX_LINE_LENGTH, Integer.valueOf(SpoolDirectorySourceConfigurationConstants.DEFAULT_BUFFER_MAX_LINE_LENGTH)).intValue();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Event createEvent(String str, String str2) {
        Event withBody = EventBuilder.withBody(str.getBytes());
        if (this.fileHeader) {
            withBody.getHeaders().put(this.fileHeaderKey, str2);
        }
        return withBody;
    }
}
