package org.vertexium.accumulo.util;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
import org.apache.hadoop.fs.FileSystem;
import org.vertexium.Graph;
import org.vertexium.GraphConfiguration;
import org.vertexium.Property;
import org.vertexium.VertexiumException;
import org.vertexium.accumulo.AccumuloGraph;
import org.vertexium.accumulo.AccumuloGraphConfiguration;
import org.vertexium.accumulo.ElementMutationBuilder;
import org.vertexium.accumulo.StreamingPropertyValueHdfsRef;
import org.vertexium.accumulo.StreamingPropertyValueTable;
import org.vertexium.accumulo.StreamingPropertyValueTableRef;
import org.vertexium.accumulo.keys.DataTableRowKey;
import org.vertexium.property.StreamingPropertyValue;
import org.vertexium.property.StreamingPropertyValueRef;
import org.vertexium.util.IOUtils;
import org.vertexium.util.VertexiumLogger;
import org.vertexium.util.VertexiumLoggerFactory;

/* loaded from: input_file:org/vertexium/accumulo/util/OverflowIntoHdfsStreamingPropertyValueStorageStrategy.class */
public class OverflowIntoHdfsStreamingPropertyValueStorageStrategy implements StreamingPropertyValueStorageStrategy {
    private static final VertexiumLogger LOGGER = VertexiumLoggerFactory.getLogger(ElementMutationBuilder.class);
    private final FileSystem fileSystem;
    private final long maxStreamingPropertyValueTableDataSize;
    private final String dataDir;
    private final AccumuloGraph graph;

    public OverflowIntoHdfsStreamingPropertyValueStorageStrategy(Graph graph, GraphConfiguration graphConfiguration) throws Exception {
        if (!(graphConfiguration instanceof AccumuloGraphConfiguration)) {
            throw new VertexiumException("Expected " + AccumuloGraphConfiguration.class.getName() + " found " + graphConfiguration.getClass().getName());
        }
        if (!(graph instanceof AccumuloGraph)) {
            throw new VertexiumException("Expected " + AccumuloGraph.class.getName() + " found " + graph.getClass().getName());
        }
        this.graph = (AccumuloGraph) graph;
        AccumuloGraphConfiguration accumuloGraphConfiguration = (AccumuloGraphConfiguration) graphConfiguration;
        this.fileSystem = accumuloGraphConfiguration.createFileSystem();
        this.maxStreamingPropertyValueTableDataSize = accumuloGraphConfiguration.getMaxStreamingPropertyValueTableDataSize();
        this.dataDir = accumuloGraphConfiguration.getDataDir();
    }

    @Override // org.vertexium.accumulo.util.StreamingPropertyValueStorageStrategy
    public StreamingPropertyValueRef saveStreamingPropertyValue(ElementMutationBuilder elementMutationBuilder, String str, Property property, StreamingPropertyValue streamingPropertyValue) {
        try {
            HdfsLargeDataStore hdfsLargeDataStore = new HdfsLargeDataStore(this.fileSystem, this.dataDir, str, property);
            LimitOutputStream limitOutputStream = new LimitOutputStream(hdfsLargeDataStore, this.maxStreamingPropertyValueTableDataSize);
            try {
                IOUtils.copy(streamingPropertyValue.getInputStream(), limitOutputStream);
                limitOutputStream.close();
                if (!limitOutputStream.hasExceededSizeLimit()) {
                    return saveStreamingPropertyValueSmall(elementMutationBuilder, str, property, limitOutputStream.getSmall(), streamingPropertyValue);
                }
                LOGGER.debug("saved large file to \"%s\" (length: %d)", new Object[]{hdfsLargeDataStore.getFullHdfsPath(), Long.valueOf(limitOutputStream.getLength())});
                return new StreamingPropertyValueHdfsRef(hdfsLargeDataStore.getRelativeFileName(), streamingPropertyValue);
            } catch (Throwable th) {
                limitOutputStream.close();
                throw th;
            }
        } catch (IOException e) {
            throw new VertexiumException(e);
        }
    }

    @Override // org.vertexium.accumulo.util.StreamingPropertyValueStorageStrategy
    public void close() {
        try {
            this.fileSystem.close();
        } catch (IOException e) {
            throw new VertexiumException("Could not close filesystem", e);
        }
    }

    @Override // org.vertexium.accumulo.util.StreamingPropertyValueStorageStrategy
    public List<InputStream> getInputStreams(List<StreamingPropertyValue> list) {
        List list2 = (List) list.stream().filter(streamingPropertyValue -> {
            return streamingPropertyValue instanceof StreamingPropertyValueTable;
        }).map(streamingPropertyValue2 -> {
            return (StreamingPropertyValueTable) streamingPropertyValue2;
        }).filter(streamingPropertyValueTable -> {
            return !streamingPropertyValueTable.isDataLoaded();
        }).collect(Collectors.toList());
        Map<String, byte[]> streamingPropertyValueTableDatas = streamingPropertyValueTableDatas((List) list2.stream().map((v0) -> {
            return v0.getDataRowKey();
        }).collect(Collectors.toList()));
        list2.forEach(streamingPropertyValueTable2 -> {
            String dataRowKey = streamingPropertyValueTable2.getDataRowKey();
            byte[] bArr = (byte[]) streamingPropertyValueTableDatas.get(dataRowKey);
            if (bArr == null) {
                throw new VertexiumException("Could not find StreamingPropertyValue data: " + dataRowKey);
            }
            streamingPropertyValueTable2.setData(bArr);
        });
        return (List) list.stream().map((v0) -> {
            return v0.getInputStream();
        }).collect(Collectors.toList());
    }

    private Map<String, byte[]> streamingPropertyValueTableDatas(List<String> list) {
        try {
            if (list.size() == 0) {
                return Collections.emptyMap();
            }
            List list2 = (List) list.stream().map(RangeUtils::createRangeFromString).collect(Collectors.toList());
            long currentTimeMillis = System.currentTimeMillis();
            ScannerBase<Map.Entry> createBatchScanner = this.graph.createBatchScanner(this.graph.getDataTableName(), list2, new Authorizations());
            this.graph.getGraphLogger().logStartIterator(this.graph.getDataTableName(), createBatchScanner);
            Span start = Trace.start("streamingPropertyValueTableData");
            start.data("dataRowKeyCount", Integer.toString(list.size()));
            try {
                HashMap hashMap = new HashMap();
                for (Map.Entry entry : createBatchScanner) {
                    hashMap.put(((Key) entry.getKey()).getRow().toString(), ((Value) entry.getValue()).get());
                }
                return hashMap;
            } finally {
                createBatchScanner.close();
                start.stop();
                this.graph.getGraphLogger().logEndIterator(System.currentTimeMillis() - currentTimeMillis);
            }
        } catch (Exception e) {
            throw new VertexiumException(e);
        }
    }

    private StreamingPropertyValueRef saveStreamingPropertyValueSmall(ElementMutationBuilder elementMutationBuilder, String str, Property property, byte[] bArr, StreamingPropertyValue streamingPropertyValue) {
        String rowKey = new DataTableRowKey(str, property).getRowKey();
        Mutation mutation = new Mutation(rowKey);
        mutation.put(ElementMutationBuilder.EMPTY_TEXT, ElementMutationBuilder.EMPTY_TEXT, property.getTimestamp(), new Value(bArr));
        elementMutationBuilder.saveDataMutation(mutation);
        return new StreamingPropertyValueTableRef(rowKey, streamingPropertyValue, bArr);
    }

    public FileSystem getFileSystem() {
        return this.fileSystem;
    }

    public String getDataDir() {
        return this.dataDir;
    }
}
