package org.apache.rya.accumulo.mr.merge;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.UnmodifiableIterator;
import java.io.File;
import java.io.IOException;
import java.lang.Thread;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.admin.SecurityOperations;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.iterators.user.AgeOffFilter;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.mr.AccumuloHDFSFileInputFormat;
import org.apache.rya.accumulo.mr.merge.mappers.MergeToolMapper;
import org.apache.rya.accumulo.mr.merge.util.AccumuloRyaUtils;
import org.apache.rya.accumulo.mr.merge.util.TimeUtils;
import org.apache.rya.accumulo.mr.merge.util.ToolConfigUtils;
import org.apache.rya.api.RdfCloudTripleStoreUtils;
import org.apache.rya.api.path.PathUtils;

/* loaded from: input_file:org/apache/rya/accumulo/mr/merge/MergeTool.class */
public class MergeTool extends AbstractDualInstanceAccumuloMRTool {
    public static final String CHILD_SUFFIX = ".child";
    public static final String TEMP_SUFFIX = "_temp_child";
    public static final String START_TIME_PROP = "tool.start.time";
    public static final String TABLE_NAME_PROP = "tool.table.name";
    public static final String USE_MERGE_FILE_INPUT = "use.merge.file.input";
    public static final String MERGE_FILE_INPUT_PATH = "merge.file.input.path";
    private String startTime = null;
    private String tempDir = null;
    private boolean useMergeFileInput = false;
    private String localMergeFileImportDir = null;
    private String baseImportDir = null;
    private String tempChildAuths = null;
    private final List<String> tables = new ArrayList();
    private static final Logger log = Logger.getLogger(MergeTool.class);
    public static final SimpleDateFormat START_TIME_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmssSSSz");
    public static final ImmutableMap<String, List<String>> DUPLICATE_KEY_MAP = ImmutableMap.builder().put("ac.mock", ImmutableList.of(".useMockInstance")).put("ac.instance", ImmutableList.of("sc.cloudbase.instancename")).put("ac.username", ImmutableList.of("sc.cloudbase.username")).put("ac.pwd", ImmutableList.of("sc.cloudbase.password")).put("ac.auth", ImmutableList.of("query.auth", "query.auth")).put("ac.zk", ImmutableList.of("sc.cloudbase.zookeepers")).put("rdf.tablePrefix", ImmutableList.of("query.tblprefix")).put("ac.mock.child", ImmutableList.of(".useMockInstance.child")).put("ac.instance.child", ImmutableList.of("sc.cloudbase.instancename.child")).put("ac.username.child", ImmutableList.of("sc.cloudbase.username.child")).put("ac.pwd.child", ImmutableList.of("sc.cloudbase.password.child")).put("ac.auth.child", ImmutableList.of("query.auth.child", "query.auth.child")).put("ac.zk.child", ImmutableList.of("sc.cloudbase.zookeepers.child")).put("rdf.tablePrefix.child", ImmutableList.of("query.tblprefix.child")).build();

    public static void setDuplicateKeys(Configuration configuration) {
        UnmodifiableIterator it = DUPLICATE_KEY_MAP.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            String str = (String) entry.getKey();
            List list = (List) entry.getValue();
            String str2 = configuration.get(str);
            if (str2 != null) {
                Iterator it2 = list.iterator();
                while (it2.hasNext()) {
                    configuration.set((String) it2.next(), str2);
                }
            }
        }
    }

    public static void setDuplicateKeysForProperty(Configuration configuration, String str, String str2) {
        List list = (List) DUPLICATE_KEY_MAP.get(str);
        configuration.set(str, str2);
        if (list != null) {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                configuration.set((String) it.next(), str2);
            }
        }
    }

    public void setup() throws Exception {
        super.init();
        this.tempDir = this.conf.get("hadoop.tmp.dir", (String) null);
        if (this.tempDir == null) {
            throw new Exception("Invalid hadoop temp directory. \"hadoop.tmp.dir\" could not be found in the configuration.");
        }
        this.useMergeFileInput = this.conf.getBoolean(USE_MERGE_FILE_INPUT, false);
        this.localMergeFileImportDir = this.conf.get(MERGE_FILE_INPUT_PATH, (String) null);
        this.baseImportDir = this.tempDir + "/merge_tool_file_input/";
        this.startTime = this.conf.get(START_TIME_PROP, (String) null);
        if (!this.useMergeFileInput) {
            if (this.startTime != null) {
                try {
                    log.info("Will merge all data after " + START_TIME_FORMATTER.parse(this.startTime));
                } catch (ParseException e) {
                    throw new Exception("Unable to parse the provided start time: " + this.startTime, e);
                }
            }
            if (this.conf.getBoolean(CopyTool.USE_NTP_SERVER_PROP, false)) {
                String str = this.conf.get(CopyTool.CHILD_TOMCAT_URL_PROP, (String) null);
                String str2 = this.conf.get(CopyTool.NTP_SERVER_HOST_PROP, (String) null);
                try {
                    log.info("Comparing child machine's time to NTP server time...");
                    Long ntpServerAndMachineTimeDifference = TimeUtils.getNtpServerAndMachineTimeDifference(str2, str);
                    if (ntpServerAndMachineTimeDifference != null) {
                        this.conf.set(CopyTool.CHILD_TIME_OFFSET_PROP, "" + ntpServerAndMachineTimeDifference);
                    }
                } catch (IOException | ParseException e2) {
                    throw new Exception("Unable to get time difference between machine and NTP server.", e2);
                }
            }
        }
        setDuplicateKeys(this.conf);
        this.tables.add(this.tablePrefix + "spo");
    }

    @Override // org.apache.rya.accumulo.mr.merge.AbstractDualInstanceAccumuloMRTool
    public int run(String[] strArr) throws Exception {
        this.useMergeFileInput = this.conf.getBoolean(USE_MERGE_FILE_INPUT, false);
        log.info("Setting up Merge Tool...");
        setup();
        if (this.useMergeFileInput) {
            copyParentPropertiesToChild(this.conf);
        }
        for (String str : this.tables) {
            String replaceFirst = str.replaceFirst(this.tablePrefix, this.childTablePrefix);
            String str2 = "Merge Tool, merging Child Table: " + replaceFirst + ", into Parent Table: " + str + ", " + System.currentTimeMillis();
            log.info("Initializing job: " + str2);
            this.conf.set("mapred.job.name", str2);
            this.conf.set(TABLE_NAME_PROP, str);
            Job job = Job.getInstance(this.conf);
            job.setJarByClass(MergeTool.class);
            if (this.useMergeFileInput) {
                importChildFilesToTempParentTable(replaceFirst);
            }
            setupAccumuloInput(job);
            InputFormatBase.setInputTableName(job, str);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Mutation.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Mutation.class);
            setupAccumuloOutput(job, str);
            job.setMapperClass(MergeToolMapper.class);
            job.setReducerClass(Reducer.class);
            Date date = new Date();
            log.info("Job for table \"" + str + "\" started: " + date);
            int i = job.waitForCompletion(true) ? 0 : 1;
            if (this.useMergeFileInput && StringUtils.isNotBlank(this.tempChildAuths)) {
                AccumuloRdfConfiguration accumuloRdfConfiguration = new AccumuloRdfConfiguration(this.conf);
                accumuloRdfConfiguration.setTablePrefix(this.tablePrefix);
                AccumuloRyaUtils.removeUserAuths(this.userName, AccumuloRyaUtils.setupConnector(accumuloRdfConfiguration).securityOperations(), this.tempChildAuths);
            }
            if (i != 0) {
                log.error("Job for table \"" + str + "\" Failed!!!");
                return i;
            }
            Date date2 = new Date();
            log.info("Job for table \"" + str + "\" finished: " + date2);
            log.info("The job took " + ((date2.getTime() - date.getTime()) / 1000) + " seconds.");
        }
        return 0;
    }

    public void createTempTableIfNeeded(String str) throws IOException {
        try {
            AccumuloRdfConfiguration accumuloRdfConfiguration = new AccumuloRdfConfiguration(this.conf);
            accumuloRdfConfiguration.setTablePrefix(this.childTablePrefix);
            Connector connector = AccumuloRyaUtils.setupConnector(accumuloRdfConfiguration);
            if (!connector.tableOperations().exists(str)) {
                log.info("Creating table: " + str);
                connector.tableOperations().create(str);
                log.info("Created table: " + str);
                log.info("Granting authorizations to table: " + str);
                SecurityOperations securityOperations = connector.securityOperations();
                securityOperations.grantTablePermission(this.userName, str, TablePermission.WRITE);
                log.info("Granted authorizations to table: " + str);
                Authorizations userAuthorizations = securityOperations.getUserAuthorizations(this.userName);
                if (!userAuthorizations.equals(this.childAuthorizations)) {
                    this.tempChildAuths = Joiner.on(",").join(findUniqueAuthsFromChild(userAuthorizations.toString(), this.childAuthorizations.toString()));
                    log.info("Adding the authorization, \"" + this.tempChildAuths + "\", to the parent user, \"" + this.userName + "\"");
                    securityOperations.changeUserAuthorizations(this.userName, AccumuloRyaUtils.addUserAuths(this.userName, securityOperations, new Authorizations(new String[]{this.tempChildAuths})));
                }
            }
        } catch (TableExistsException | AccumuloException | AccumuloSecurityException e) {
            throw new IOException((Throwable) e);
        }
    }

    private static List<String> findUniqueAuthsFromChild(String str, String str2) {
        List<String> convertAuthStringToList = AccumuloRyaUtils.convertAuthStringToList(str);
        List<String> convertAuthStringToList2 = AccumuloRyaUtils.convertAuthStringToList(str2);
        convertAuthStringToList2.removeAll(convertAuthStringToList);
        return convertAuthStringToList2;
    }

    public void importChildFilesToTempParentTable(String str) throws Exception {
        String str2 = str + TEMP_SUFFIX;
        createTempTableIfNeeded(str2);
        AccumuloRdfConfiguration accumuloRdfConfiguration = new AccumuloRdfConfiguration(this.conf);
        accumuloRdfConfiguration.setTablePrefix(this.childTablePrefix);
        TableOperations tableOperations = AccumuloRyaUtils.setupConnector(accumuloRdfConfiguration).tableOperations();
        Path path = CopyTool.getPath(this.localMergeFileImportDir, str);
        Path path2 = CopyTool.getPath(this.baseImportDir, str);
        CopyTool.copyLocalToHdfs(path, path2, this.conf);
        Path path3 = CopyTool.getPath(path2.toString(), "files");
        Path path4 = CopyTool.getPath(path2.toString(), "failures");
        FileSystem fileSystem = FileSystem.get(this.conf);
        fileSystem.setPermission(path2, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
        if (fileSystem.exists(path4)) {
            fileSystem.delete(path4, true);
        }
        fileSystem.mkdirs(path4);
        tableOperations.importDirectory(str2, path3.toString(), path4.toString(), false);
        AccumuloRyaUtils.printTablePretty(str2, this.conf, new IteratorSetting[0]);
    }

    public static void copyParentPropertiesToChild(Configuration configuration) {
        copyParentPropToChild(configuration, "ac.mock");
        copyParentPropToChild(configuration, "ac.instance");
        copyParentPropToChild(configuration, "ac.username");
        copyParentPropToChild(configuration, "ac.pwd");
        copyParentPropToChild(configuration, "ac.zk");
        setDuplicateKeys(configuration);
    }

    public static void copyParentPropToChild(Configuration configuration, String str) {
        configuration.set(str + CHILD_SUFFIX, configuration.get(str, ""));
    }

    protected void setupAccumuloInput(Job job) throws AccumuloSecurityException {
        if (this.hdfsInput) {
            job.setInputFormatClass(AccumuloHDFSFileInputFormat.class);
        } else {
            job.setInputFormatClass(AccumuloInputFormat.class);
        }
        AbstractInputFormat.setConnectorInfo(job, this.userName, new PasswordToken(this.pwd));
        InputFormatBase.setInputTableName(job, RdfCloudTripleStoreUtils.layoutPrefixToTable(this.rdfTableLayout, this.tablePrefix));
        AbstractInputFormat.setScanAuthorizations(job, this.authorizations);
        if (this.mock) {
            AbstractInputFormat.setMockInstance(job, this.instance);
        } else {
            AbstractInputFormat.setZooKeeperInstance(job, new ClientConfiguration(new org.apache.commons.configuration.Configuration[0]).withInstance(this.instance).withZkHosts(this.zk));
        }
        if (this.ttl != null) {
            IteratorSetting iteratorSetting = new IteratorSetting(1, "fi", AgeOffFilter.class);
            AgeOffFilter.setTTL(iteratorSetting, Long.valueOf(this.ttl));
            InputFormatBase.addIterator(job, iteratorSetting);
        }
        UnmodifiableIterator it = AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS.iterator();
        while (it.hasNext()) {
            InputFormatBase.addIterator(job, (IteratorSetting) it.next());
        }
    }

    public static int setupAndRun(String[] strArr) {
        int i = -1;
        try {
            Configuration configuration = new Configuration();
            Set<String> userArguments = ToolConfigUtils.getUserArguments(configuration, strArr);
            if (!userArguments.isEmpty()) {
                log.info("Running Merge Tool with the following parameters...\r\n\t" + Joiner.on("\r\n\t").join(userArguments));
            }
            i = ToolRunner.run(configuration, new MergeTool(), strArr);
        } catch (Exception e) {
            log.error("Error running merge tool", e);
        }
        return i;
    }

    public static void main(String[] strArr) {
        String property = System.getProperties().getProperty("log4j.configuration");
        if (StringUtils.isNotBlank(property)) {
            String clean = PathUtils.clean(StringUtils.removeStart(property, "file:"));
            if (new File(clean).exists()) {
                DOMConfigurator.configure(clean);
            } else {
                BasicConfigurator.configure();
            }
        }
        log.info("Starting Merge Tool");
        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.rya.accumulo.mr.merge.MergeTool.1
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                MergeTool.log.error("Uncaught exception in " + thread.getName(), th);
            }
        });
        int i = setupAndRun(strArr);
        log.info("Finished running Merge Tool");
        System.exit(i);
    }

    public static String getStartTimeString(Date date) {
        return convertDateToStartTimeString(date);
    }

    public static String convertDateToStartTimeString(Date date) {
        return START_TIME_FORMATTER.format(date);
    }

    public static Date convertStartTimeStringToDate(String str) {
        try {
            return START_TIME_FORMATTER.parse(str);
        } catch (ParseException e) {
            log.error("Could not parse date", e);
            return null;
        }
    }
}
