package org.apache.rya.accumulo.mr.merge.mappers;

import com.google.common.base.Charsets;
import java.io.IOException;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.AccumuloRdfConstants;
import org.apache.rya.accumulo.AccumuloRyaDAO;
import org.apache.rya.accumulo.RyaTableMutationsFactory;
import org.apache.rya.accumulo.mr.merge.CopyTool;
import org.apache.rya.accumulo.mr.merge.MergeTool;
import org.apache.rya.accumulo.mr.merge.util.AccumuloRyaUtils;
import org.apache.rya.accumulo.mr.merge.util.TimeUtils;
import org.apache.rya.api.RdfCloudTripleStoreConstants;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.resolver.RyaTripleContext;
import org.apache.rya.api.resolver.triple.TripleRow;
import org.apache.rya.api.resolver.triple.TripleRowResolverException;

/* loaded from: input_file:org/apache/rya/accumulo/mr/merge/mappers/MergeToolMapper.class */
public class MergeToolMapper extends Mapper<Key, Value, Text, Mutation> {
    private static final Logger log = Logger.getLogger(MergeToolMapper.class);
    private boolean usesStartTime;
    private String startTimeString;
    private Date startTime;
    private String parentTableName;
    private String childTableName;
    private String parentTablePrefix;
    private String childTablePrefix;
    private Text spoTable;
    private Text poTable;
    private Text ospTable;
    private Mapper<Key, Value, Text, Mutation>.Context context;
    private Configuration parentConfig;
    private Configuration childConfig;
    private AccumuloRdfConfiguration parentAccumuloRdfConfiguration;
    private AccumuloRdfConfiguration childAccumuloRdfConfiguration;
    private RyaTripleContext parentRyaContext;
    private RyaTripleContext childRyaContext;
    private RyaTableMutationsFactory ryaTableMutationFactory;
    private Scanner childScanner;
    private Iterator<Map.Entry<Key, Value>> childIterator;
    private Connector childConnector;
    private AccumuloRyaDAO childDao;
    private Date copyToolInputTime;
    private Date copyToolRunTime;
    private Long parentTimeOffset = 0L;
    private Long childTimeOffset = 0L;
    private boolean useTimeSync = false;
    private boolean useMergeFileInput = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/rya/accumulo/mr/merge/mappers/MergeToolMapper$CompareKeysResult.class */
    public enum CompareKeysResult {
        ADVANCE_CHILD,
        ADVANCE_CHILD_AND_ADD,
        ADVANCE_PARENT,
        ADVANCE_PARENT_AND_DELETE,
        ADVANCE_BOTH,
        FINISHED
    }

    public void run(Mapper<Key, Value, Text, Mutation>.Context context) throws IOException, InterruptedException {
        setup(context);
        this.context = context;
        try {
            try {
                RyaStatement nextParentRyaStatement = nextParentRyaStatement();
                RyaStatement nextChildRyaStatement = nextChildRyaStatement();
                CompareKeysResult compareKeysResult = null;
                while (compareKeysResult != CompareKeysResult.FINISHED) {
                    compareKeysResult = compareKeys(nextParentRyaStatement, nextChildRyaStatement);
                    switch (compareKeysResult) {
                        case ADVANCE_CHILD:
                            nextChildRyaStatement = nextChildRyaStatement();
                            break;
                        case ADVANCE_PARENT:
                            nextParentRyaStatement = nextParentRyaStatement();
                            break;
                        case ADVANCE_CHILD_AND_ADD:
                            RyaStatement ryaStatement = nextChildRyaStatement;
                            nextChildRyaStatement = nextChildRyaStatement();
                            addKey(ryaStatement, context);
                            break;
                        case ADVANCE_PARENT_AND_DELETE:
                            RyaStatement ryaStatement2 = nextParentRyaStatement;
                            nextParentRyaStatement = nextParentRyaStatement();
                            deleteKey(ryaStatement2, context);
                            break;
                        case ADVANCE_BOTH:
                            ColumnVisibility columnVisibility = new ColumnVisibility(nextParentRyaStatement.getColumnVisibility());
                            ColumnVisibility columnVisibility2 = new ColumnVisibility(nextChildRyaStatement.getColumnVisibility());
                            if (!columnVisibility.equals(columnVisibility2) && !columnVisibility2.equals(AccumuloRdfConstants.EMPTY_CV)) {
                                RyaStatement updateRyaStatementColumnVisibility = updateRyaStatementColumnVisibility(nextParentRyaStatement, combineColumnVisibilities(columnVisibility, columnVisibility2));
                                deleteKey(nextParentRyaStatement, context);
                                addKey(updateRyaStatementColumnVisibility, context);
                            }
                            nextParentRyaStatement = nextParentRyaStatement();
                            nextChildRyaStatement = nextChildRyaStatement();
                            break;
                        case FINISHED:
                            log.info("Finished scanning parent and child tables");
                            break;
                        default:
                            log.error("Unknown result: " + compareKeysResult);
                            break;
                    }
                }
            } catch (MutationsRejectedException | TripleRowResolverException e) {
                log.error("Error encountered while merging", e);
                cleanup(context);
            }
        } finally {
            cleanup(context);
        }
    }

    private RyaStatement nextParentRyaStatement() throws IOException, InterruptedException {
        return nextRyaStatement(this.context, this.parentRyaContext);
    }

    private RyaStatement nextChildRyaStatement() throws IOException, InterruptedException {
        return nextRyaStatement(this.childIterator, this.childRyaContext);
    }

    private static RyaStatement nextRyaStatement(Iterator<Map.Entry<Key, Value>> it, RyaTripleContext ryaTripleContext) {
        RyaStatement ryaStatement = null;
        if (it.hasNext()) {
            Map.Entry<Key, Value> next = it.next();
            try {
                ryaStatement = createRyaStatement(next.getKey(), next.getValue(), ryaTripleContext);
            } catch (TripleRowResolverException e) {
                log.error("TripleRowResolverException encountered while creating statement", e);
            }
        }
        return ryaStatement;
    }

    private static RyaStatement nextRyaStatement(Mapper<Key, Value, Text, Mutation>.Context context, RyaTripleContext ryaTripleContext) throws IOException, InterruptedException {
        RyaStatement ryaStatement = null;
        if (context.nextKeyValue()) {
            try {
                ryaStatement = createRyaStatement((Key) context.getCurrentKey(), (Value) context.getCurrentValue(), ryaTripleContext);
            } catch (TripleRowResolverException e) {
                log.error("TripleRowResolverException encountered while creating statement", e);
            }
        }
        return ryaStatement;
    }

    private static RyaStatement createRyaStatement(Key key, Value value, RyaTripleContext ryaTripleContext) throws TripleRowResolverException {
        return ryaTripleContext.deserializeTriple(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, new TripleRow((key.getRowData() == null || key.getRowData().toArray().length <= 0) ? null : key.getRowData().toArray(), (key.getColumnFamilyData() == null || key.getColumnFamilyData().toArray().length <= 0) ? null : key.getColumnFamilyData().toArray(), (key.getColumnQualifierData() == null || key.getColumnQualifierData().toArray().length <= 0) ? null : key.getColumnQualifierData().toArray(), Long.valueOf(key.getTimestamp()), (key.getColumnVisibilityData() == null || key.getColumnVisibilityData().toArray().length <= 0) ? null : key.getColumnVisibilityData().toArray(), (value == null || value.get().length <= 0) ? null : value.get()));
    }

    protected void setup(Mapper<Key, Value, Text, Mutation>.Context context) throws IOException, InterruptedException {
        super.setup(context);
        log.info("Setting up mapper");
        this.parentConfig = context.getConfiguration();
        this.childConfig = getChildConfig(this.parentConfig);
        this.startTimeString = this.parentConfig.get(MergeTool.START_TIME_PROP, (String) null);
        if (this.startTimeString != null) {
            this.startTime = MergeTool.convertStartTimeStringToDate(this.startTimeString);
        }
        this.usesStartTime = this.startTime != null;
        this.useTimeSync = this.parentConfig.getBoolean(CopyTool.USE_NTP_SERVER_PROP, false);
        this.useMergeFileInput = this.parentConfig.getBoolean(MergeTool.USE_MERGE_FILE_INPUT, false);
        this.parentTableName = this.parentConfig.get(MergeTool.TABLE_NAME_PROP, (String) null);
        this.parentTablePrefix = this.parentConfig.get("rdf.tablePrefix", (String) null);
        this.childTablePrefix = this.childConfig.get("rdf.tablePrefix", (String) null);
        if (this.useMergeFileInput) {
            this.childTableName = this.parentTableName.replaceFirst(this.parentTablePrefix, this.childTablePrefix) + MergeTool.TEMP_SUFFIX;
        } else {
            this.childTableName = this.parentTableName.replaceFirst(this.parentTablePrefix, this.childTablePrefix);
        }
        this.spoTable = new Text(this.parentTablePrefix + "spo");
        this.poTable = new Text(this.parentTablePrefix + "po");
        this.ospTable = new Text(this.parentTablePrefix + "osp");
        this.childScanner = setupChildScanner(context);
        this.childIterator = this.childScanner.iterator();
        this.parentAccumuloRdfConfiguration = new AccumuloRdfConfiguration(this.parentConfig);
        this.parentAccumuloRdfConfiguration.setTablePrefix(this.parentTablePrefix);
        this.parentRyaContext = RyaTripleContext.getInstance(this.parentAccumuloRdfConfiguration);
        this.ryaTableMutationFactory = new RyaTableMutationsFactory(this.parentRyaContext);
        this.childAccumuloRdfConfiguration = new AccumuloRdfConfiguration(this.childConfig);
        this.childAccumuloRdfConfiguration.setTablePrefix(this.childTablePrefix);
        this.childRyaContext = RyaTripleContext.getInstance(this.childAccumuloRdfConfiguration);
        this.childConnector = AccumuloRyaUtils.setupConnector(this.childAccumuloRdfConfiguration);
        this.childDao = AccumuloRyaUtils.setupDao(this.childConnector, this.childAccumuloRdfConfiguration);
        if (this.startTime != null && this.useTimeSync) {
            try {
                this.copyToolInputTime = AccumuloRyaUtils.getCopyToolSplitDate(this.childDao);
                this.copyToolRunTime = AccumuloRyaUtils.getCopyToolRunDate(this.childDao);
                this.parentTimeOffset = AccumuloRyaUtils.getTimeOffset(this.childDao);
                log.info("The table " + this.parentTableName + " has a time offset of: " + TimeUtils.getDurationBreakdown(this.parentTimeOffset.longValue()));
                this.childTimeOffset = Long.valueOf(this.childConfig.get(CopyTool.CHILD_TIME_OFFSET_PROP, (String) null));
                Date date = new Date(this.startTime.getTime() - this.parentTimeOffset.longValue());
                Date date2 = new Date(this.startTime.getTime() - this.childTimeOffset.longValue());
                log.info("Adjusted parent start time: " + date);
                log.info("Adjusted child start time: " + date2);
            } catch (RyaDAOException e) {
                log.error("Error getting time offset", e);
            }
        }
        log.info("Finished setting up mapper");
    }

    public static Configuration getChildConfig(Configuration configuration) {
        Configuration configuration2 = new Configuration(configuration);
        convertChildPropToParentProp(configuration2, configuration, "ac.mock");
        convertChildPropToParentProp(configuration2, configuration, "ac.instance");
        convertChildPropToParentProp(configuration2, configuration, "ac.username");
        convertChildPropToParentProp(configuration2, configuration, "ac.pwd");
        convertChildPropToParentProp(configuration2, configuration, "rdf.tablePrefix");
        convertChildPropToParentProp(configuration2, configuration, "ac.auth");
        convertChildPropToParentProp(configuration2, configuration, "query.auth");
        convertChildPropToParentProp(configuration2, configuration, "ac.zk");
        MergeTool.setDuplicateKeys(configuration2);
        return configuration2;
    }

    public static void convertChildPropToParentProp(Configuration configuration, Configuration configuration2, String str) {
        configuration.set(str, configuration2.get(str + MergeTool.CHILD_SUFFIX, ""));
    }

    public static ColumnVisibility combineColumnVisibilities(ColumnVisibility columnVisibility, ColumnVisibility columnVisibility2) {
        return new ColumnVisibility(new ColumnVisibility(new Text(columnVisibility.equals(AccumuloRdfConstants.EMPTY_CV) ? new String(columnVisibility2.getExpression(), Charsets.UTF_8) : "(" + new String(columnVisibility.getExpression(), Charsets.UTF_8) + ")|(" + new String(columnVisibility2.getExpression(), Charsets.UTF_8) + ")")).flatten());
    }

    private Scanner setupChildScanner(Mapper<Key, Value, Text, Mutation>.Context context) throws IOException {
        return setupScanner(context, this.childTableName, this.childConfig);
    }

    private static Scanner setupScanner(Mapper<Key, Value, Text, Mutation>.Context context, String str, Configuration configuration) throws IOException {
        Range range = context.getInputSplit().getRange();
        Scanner scanner = AccumuloRyaUtils.getScanner(str, configuration);
        scanner.setRange(range);
        return scanner;
    }

    private void writeRyaMutations(RyaStatement ryaStatement, Mapper<Key, Value, Text, Mutation>.Context context, boolean z) throws IOException, InterruptedException {
        if (ryaStatement.getColumnVisibility() == null) {
            ryaStatement.setColumnVisibility(AccumuloRdfConstants.EMPTY_CV.getExpression());
        }
        Map serialize = this.ryaTableMutationFactory.serialize(ryaStatement);
        Collection collection = (Collection) serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
        Collection collection2 = (Collection) serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
        Collection collection3 = (Collection) serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
        Iterator it = collection.iterator();
        while (it.hasNext()) {
            writeMutation(this.spoTable, (Mutation) it.next(), context, z);
        }
        Iterator it2 = collection2.iterator();
        while (it2.hasNext()) {
            writeMutation(this.poTable, (Mutation) it2.next(), context, z);
        }
        Iterator it3 = collection3.iterator();
        while (it3.hasNext()) {
            writeMutation(this.ospTable, (Mutation) it3.next(), context, z);
        }
    }

    private void addKey(RyaStatement ryaStatement, Mapper<Key, Value, Text, Mutation>.Context context) throws IOException, InterruptedException {
        writeRyaMutations(ryaStatement, context, false);
    }

    private void deleteKey(RyaStatement ryaStatement, Mapper<Key, Value, Text, Mutation>.Context context) throws IOException, InterruptedException {
        writeRyaMutations(ryaStatement, context, true);
    }

    private static void writeMutation(Text text, Mutation mutation, Mapper<Key, Value, Text, Mutation>.Context context, boolean z) throws IOException, InterruptedException {
        if (!z) {
            context.write(text, mutation);
            return;
        }
        ColumnUpdate columnUpdate = (ColumnUpdate) mutation.getUpdates().get(0);
        ColumnVisibility columnVisibility = columnUpdate.getColumnVisibility() != null ? new ColumnVisibility(columnUpdate.getColumnVisibility()) : null;
        Mutation mutation2 = new Mutation(new Text(mutation.getRow()));
        mutation2.putDelete(columnUpdate.getColumnFamily(), columnUpdate.getColumnQualifier(), columnVisibility, columnUpdate.getTimestamp());
        context.write(text, mutation2);
    }

    private Date normalizeDate(Date date, boolean z) {
        Date date2 = date;
        if (this.useTimeSync) {
            date2 = z ? new Date(date.getTime() - this.parentTimeOffset.longValue()) : TimeUtils.dateBeforeInclusive(date, this.copyToolRunTime) ? new Date(date.getTime() - this.parentTimeOffset.longValue()) : new Date(date.getTime() - this.childTimeOffset.longValue());
        }
        return date2;
    }

    private CompareKeysResult compareKeys(RyaStatement ryaStatement, RyaStatement ryaStatement2) throws MutationsRejectedException, IOException, InterruptedException, TripleRowResolverException {
        log.trace("key1 = " + ryaStatement);
        log.trace("key2 = " + ryaStatement2);
        if (ryaStatement == null && ryaStatement2 == null) {
            return CompareKeysResult.FINISHED;
        }
        if (ryaStatement == null) {
            return this.usesStartTime && normalizeDate(new Date(ryaStatement2.getTimestamp().longValue()), false).before(this.startTime) ? CompareKeysResult.ADVANCE_CHILD : CompareKeysResult.ADVANCE_CHILD_AND_ADD;
        }
        if (ryaStatement2 == null) {
            Date normalizeDate = normalizeDate(new Date(ryaStatement.getTimestamp().longValue()), true);
            return this.usesStartTime && ((this.copyToolInputTime != null && (normalizeDate.before(this.copyToolInputTime) || (normalizeDate.after(this.copyToolInputTime) && normalizeDate.after(this.startTime)))) || (this.copyToolInputTime == null && normalizeDate.after(this.startTime))) ? CompareKeysResult.ADVANCE_PARENT : CompareKeysResult.ADVANCE_PARENT_AND_DELETE;
        }
        Text text = new Text(((TripleRow) this.parentRyaContext.serializeTriple(ryaStatement).get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO)).getRow());
        Text text2 = new Text(((TripleRow) this.childRyaContext.serializeTriple(ryaStatement2).get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO)).getRow());
        Date normalizeDate2 = normalizeDate(new Date(ryaStatement.getTimestamp().longValue()), true);
        Date normalizeDate3 = normalizeDate(new Date(ryaStatement2.getTimestamp().longValue()), false);
        if (text.compareTo(text2) < 0) {
            return this.usesStartTime && ((this.copyToolInputTime != null && (normalizeDate2.before(this.copyToolInputTime) || (normalizeDate2.after(this.copyToolInputTime) && normalizeDate2.after(this.startTime)))) || (this.copyToolInputTime == null && normalizeDate2.after(this.startTime))) ? CompareKeysResult.ADVANCE_PARENT : CompareKeysResult.ADVANCE_PARENT_AND_DELETE;
        }
        if (text.compareTo(text2) > 0) {
            return this.usesStartTime && normalizeDate3.before(this.startTime) ? CompareKeysResult.ADVANCE_CHILD : CompareKeysResult.ADVANCE_CHILD_AND_ADD;
        }
        return CompareKeysResult.ADVANCE_BOTH;
    }

    private static RyaStatement updateRyaStatementColumnVisibility(RyaStatement ryaStatement, ColumnVisibility columnVisibility) {
        return new RyaStatement(ryaStatement.getSubject(), ryaStatement.getPredicate(), ryaStatement.getObject(), ryaStatement.getContext(), ryaStatement.getQualifer(), columnVisibility.getExpression(), ryaStatement.getValue(), ryaStatement.getTimestamp());
    }

    protected void cleanup(Mapper<Key, Value, Text, Mutation>.Context context) throws IOException, InterruptedException {
        super.cleanup(context);
        log.info("Cleaning up mapper...");
        if (this.childScanner != null) {
            this.childScanner.close();
        }
        try {
            if (this.childDao != null) {
                this.childDao.destroy();
            }
        } catch (RyaDAOException e) {
            log.error("Error destroying child DAO", e);
        }
        log.info("Cleaned up mapper");
    }
}
