package org.apache.flume.client.avro;

import com.google.common.base.Preconditions;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.derby.impl.sql.execute.xplain.XPLAINUtil;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.client.avro.ReliableSpoolingFileEventReader;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.PollableSourceConstants;
import org.apache.flume.source.SyslogSourceConfigurationConstants;
import org.apache.flume.util.SSLUtil;
import org.jboss.netty.handler.codec.http.multipart.HttpPostBodyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Evolving
/* loaded from: input_file:META-INF/bundled-dependencies/flume-ng-core-1.9.0.jar:org/apache/flume/client/avro/AvroCLIClient.class */
public class AvroCLIClient {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) AvroCLIClient.class);
    private static final int BATCH_SIZE = 5;
    private static final int MAX_LINE_LENGTH = 2000;
    private String hostname;
    private int port;
    private String fileName;
    private String rpcClientPropsFile;
    private String dirName;
    private Map<String, String> headers = new HashMap();
    private int sent;

    public static void main(String[] strArr) {
        SSLUtil.initGlobalSSLParameters();
        AvroCLIClient avroCLIClient = new AvroCLIClient();
        try {
            if (avroCLIClient.parseCommandLine(strArr)) {
                avroCLIClient.run();
            }
        } catch (IOException e) {
            logger.error("Unable to send data to Flume. Exception follows.", (Throwable) e);
        } catch (ParseException e2) {
            logger.error("Unable to parse command line options - {}", e2.getMessage());
        } catch (EventDeliveryException e3) {
            logger.error("Unable to deliver events to Flume. Exception follows.", (Throwable) e3);
        } catch (FlumeException e4) {
            logger.error("Unable to open connection to Flume. Exception follows.", (Throwable) e4);
        }
        logger.debug("Exiting");
    }

    private void parseHeaders(CommandLine commandLine) {
        String optionValue = commandLine.getOptionValue("headerFile");
        FileInputStream fileInputStream = null;
        if (optionValue != null) {
            try {
                try {
                    fileInputStream = new FileInputStream(optionValue);
                    Properties properties = new Properties();
                    properties.load(fileInputStream);
                    for (Map.Entry entry : properties.entrySet()) {
                        String str = (String) entry.getKey();
                        String str2 = (String) entry.getValue();
                        logger.debug("Inserting Header Key [" + str + "] header value [" + str2 + DefaultExpressionEngine.DEFAULT_ATTRIBUTE_END);
                        this.headers.put(str, str2);
                    }
                } catch (Exception e) {
                    logger.error("Unable to load headerFile", optionValue, e);
                    if (fileInputStream != null) {
                        try {
                            fileInputStream.close();
                            return;
                        } catch (Exception e2) {
                            logger.error("Unable to close headerFile", (Throwable) e2);
                            return;
                        }
                    }
                    return;
                }
            } catch (Throwable th) {
                if (fileInputStream != null) {
                    try {
                        fileInputStream.close();
                    } catch (Exception e3) {
                        logger.error("Unable to close headerFile", (Throwable) e3);
                        return;
                    }
                }
                throw th;
            }
        }
        if (fileInputStream != null) {
            try {
                fileInputStream.close();
            } catch (Exception e4) {
                logger.error("Unable to close headerFile", (Throwable) e4);
            }
        }
    }

    private boolean parseCommandLine(String[] strArr) throws ParseException {
        Options options = new Options();
        options.addOption("P", "rpcProps", true, "RPC client properties file with server connection params").addOption("p", "port", true, "port of the avro source").addOption("H", SyslogSourceConfigurationConstants.CONFIG_HOST, true, "hostname of the avro source").addOption("F", HttpPostBodyUtil.FILENAME, true, "file to stream to avro source").addOption(null, "dirname", true, "directory to stream to avro source").addOption(XPLAINUtil.LOCK_GRANULARITY_ROW, "headerFile", true, "file containing headers as key/value pairs on each new line").addOption("h", "help", false, "display help text");
        CommandLine parse = new GnuParser().parse(options, strArr);
        if (parse.hasOption('h')) {
            new HelpFormatter().printHelp("flume-ng avro-client", "", options, "The --dirname option assumes that a spooling directory exists where immutable log files are dropped.", true);
            return false;
        }
        if (parse.hasOption(HttpPostBodyUtil.FILENAME) && parse.hasOption("dirname")) {
            throw new ParseException("--filename and --dirname options cannot be used simultaneously");
        }
        if (!parse.hasOption("port") && !parse.hasOption(SyslogSourceConfigurationConstants.CONFIG_HOST) && !parse.hasOption("rpcProps")) {
            throw new ParseException("Either --rpcProps or both --host and --port must be specified.");
        }
        if (parse.hasOption("rpcProps")) {
            this.rpcClientPropsFile = parse.getOptionValue("rpcProps");
            Preconditions.checkNotNull(this.rpcClientPropsFile, "RPC client properties file must be specified after --rpcProps argument.");
            Preconditions.checkArgument(new File(this.rpcClientPropsFile).exists(), "RPC client properties file %s does not exist!", this.rpcClientPropsFile);
        }
        if (this.rpcClientPropsFile == null) {
            if (!parse.hasOption("port")) {
                throw new ParseException("You must specify a port to connect to with --port");
            }
            this.port = Integer.parseInt(parse.getOptionValue("port"));
            if (!parse.hasOption(SyslogSourceConfigurationConstants.CONFIG_HOST)) {
                throw new ParseException("You must specify a hostname to connect to with --host");
            }
            this.hostname = parse.getOptionValue(SyslogSourceConfigurationConstants.CONFIG_HOST);
        }
        this.fileName = parse.getOptionValue(HttpPostBodyUtil.FILENAME);
        this.dirName = parse.getOptionValue("dirname");
        if (!parse.hasOption("headerFile")) {
            return true;
        }
        parseHeaders(parse);
        return true;
    }

    private void run() throws IOException, FlumeException, EventDeliveryException {
        EventReader eventReader = null;
        RpcClient rpcClientFactory = this.rpcClientPropsFile != null ? RpcClientFactory.getInstance(new File(this.rpcClientPropsFile)) : RpcClientFactory.getDefaultInstance(this.hostname, Integer.valueOf(this.port), 5);
        try {
            eventReader = this.fileName != null ? new SimpleTextLineEventReader(new FileReader(new File(this.fileName))) : this.dirName != null ? new ReliableSpoolingFileEventReader.Builder().spoolDirectory(new File(this.dirName)).sourceCounter(new SourceCounter("avrocli")).build() : new SimpleTextLineEventReader(new InputStreamReader(System.in));
            long currentTimeMillis = System.currentTimeMillis();
            long j = 0;
            int batchSize = rpcClientFactory.getBatchSize();
            while (true) {
                List<Event> readEvents = eventReader.readEvents(batchSize);
                if (readEvents.isEmpty()) {
                    break;
                }
                Iterator<Event> it = readEvents.iterator();
                while (it.hasNext()) {
                    it.next().setHeaders(this.headers);
                    j += r0.getBody().length;
                    this.sent++;
                    long currentTimeMillis2 = System.currentTimeMillis();
                    if (currentTimeMillis2 >= currentTimeMillis + PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP) {
                        logger.debug("Packed {} bytes, {} events", Long.valueOf(j), Integer.valueOf(this.sent));
                        currentTimeMillis = currentTimeMillis2;
                    }
                }
                rpcClientFactory.appendBatch(readEvents);
                if (eventReader instanceof ReliableEventReader) {
                    ((ReliableEventReader) eventReader).commit();
                }
            }
            logger.debug("Finished");
            if (eventReader != null) {
                logger.debug("Closing reader");
                eventReader.close();
            }
            logger.debug("Closing RPC client");
            rpcClientFactory.close();
        } catch (Throwable th) {
            if (eventReader != null) {
                logger.debug("Closing reader");
                eventReader.close();
            }
            logger.debug("Closing RPC client");
            rpcClientFactory.close();
            throw th;
        }
    }
}
