package org.apache.rya.indexing.pcj.fluo.app.batch;

import com.google.common.base.Preconditions;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.client.scanner.ColumnScanner;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.Span;
import org.apache.log4j.Logger;
import org.apache.rya.api.function.join.IterativeJoin;
import org.apache.rya.api.function.join.LazyJoiningIterator;
import org.apache.rya.api.function.join.LeftOuterJoin;
import org.apache.rya.api.function.join.NaturalJoin;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;

/* loaded from: input_file:WEB-INF/lib/rya.pcj.fluo.app-3.2.12-incubating.jar:org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.class */
public class JoinBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
    private static final Logger log = Logger.getLogger(JoinBatchBindingSetUpdater.class);
    private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();

    @Override // org.apache.rya.indexing.pcj.fluo.app.batch.AbstractBatchBindingSetUpdater, org.apache.rya.indexing.pcj.fluo.app.batch.BatchBindingSetUpdater
    public void processBatch(TransactionBase transactionBase, Bytes bytes, BatchInformation batchInformation) throws Exception {
        IterativeJoin leftOuterJoin;
        super.processBatch(transactionBase, bytes, batchInformation);
        String nodeId = BatchRowKeyUtil.getNodeId(bytes);
        Preconditions.checkArgument(batchInformation instanceof JoinBatchInformation);
        JoinBatchInformation joinBatchInformation = (JoinBatchInformation) batchInformation;
        BatchInformation.Task task = joinBatchInformation.getTask();
        switch (joinBatchInformation.getJoinType()) {
            case NATURAL_JOIN:
                leftOuterJoin = new NaturalJoin();
                break;
            case LEFT_OUTER_JOIN:
                leftOuterJoin = new LeftOuterJoin();
                break;
            default:
                throw new RuntimeException("Unsupported JoinType: " + joinBatchInformation.getJoinType());
        }
        HashSet hashSet = new HashSet();
        Optional<RowColumn> fillSiblingBatch = fillSiblingBatch(transactionBase, joinBatchInformation, hashSet);
        VisibilityBindingSet bs = joinBatchInformation.getBs();
        Iterator<VisibilityBindingSet> newLeftResult = joinBatchInformation.getSide() == LazyJoiningIterator.Side.LEFT ? leftOuterJoin.newLeftResult(bs, hashSet.iterator()) : leftOuterJoin.newRightResult(hashSet.iterator(), bs);
        VariableOrder variableOrder = CACHE.readJoinMetadata(transactionBase, nodeId).getVariableOrder();
        while (newLeftResult.hasNext()) {
            VisibilityBindingSet next = newLeftResult.next();
            processTask(transactionBase, task, BindingHashShardingFunction.addShard(nodeId, variableOrder, next), FluoQueryColumns.JOIN_BINDING_SET, BS_SERDE.serialize(next));
        }
        if (fillSiblingBatch.isPresent()) {
            joinBatchInformation.setSpan(getNewSpan(fillSiblingBatch.get(), joinBatchInformation.getSpan()));
            BatchInformationDAO.addBatch(transactionBase, nodeId, joinBatchInformation);
        }
    }

    private void processTask(TransactionBase transactionBase, BatchInformation.Task task, Bytes bytes, Column column, Bytes bytes2) {
        switch (task) {
            case Add:
                transactionBase.set(bytes, column, bytes2);
                return;
            case Delete:
                transactionBase.delete(bytes, column);
                return;
            case Update:
                log.trace("The Task Update is not supported for JoinBatchBindingSetUpdater.  Batch will not be processed.");
                return;
            default:
                log.trace("Invalid Task type.  Aborting batch operation.");
                return;
        }
    }

    private Optional<RowColumn> fillSiblingBatch(TransactionBase transactionBase, JoinBatchInformation joinBatchInformation, Set<VisibilityBindingSet> set) throws Exception {
        Span span = joinBatchInformation.getSpan();
        Column column = joinBatchInformation.getColumn();
        int batchSize = joinBatchInformation.getBatchSize();
        Iterator<ColumnScanner> it = transactionBase.scanner().over(span).fetch(column).byRow().build().iterator();
        boolean z = false;
        Bytes row = span.getStart().getRow();
        while (it.hasNext() && !z) {
            ColumnScanner next = it.next();
            row = next.getRow();
            Iterator<ColumnValue> it2 = next.iterator();
            while (true) {
                if (!it2.hasNext()) {
                    break;
                }
                if (set.size() >= batchSize) {
                    z = true;
                    break;
                }
                set.add(BS_SERDE.deserialize(it2.next().getValue()));
            }
        }
        return z ? Optional.of(new RowColumn(row, column)) : Optional.empty();
    }
}
