package org.locationtech.geowave.datastore.redis.operations;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams;
import com.google.common.primitives.UnsignedBytes;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.locationtech.geowave.core.index.ByteArray;
import org.locationtech.geowave.core.index.SinglePartitionQueryRanges;
import org.locationtech.geowave.core.store.CloseableIterator;
import org.locationtech.geowave.core.store.CloseableIteratorWrapper;
import org.locationtech.geowave.core.store.entities.GeoWaveRow;
import org.locationtech.geowave.core.store.entities.GeoWaveRowIteratorTransformer;
import org.locationtech.geowave.core.store.entities.GeoWaveRowMergingIterator;
import org.locationtech.geowave.core.store.util.RowConsumer;
import org.locationtech.geowave.datastore.redis.config.RedisOptions;
import org.locationtech.geowave.datastore.redis.util.GeoWaveRedisPersistedRow;
import org.locationtech.geowave.datastore.redis.util.GeoWaveRedisRow;
import org.locationtech.geowave.datastore.redis.util.RedisScoredSetWrapper;
import org.locationtech.geowave.datastore.redis.util.RedisUtils;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.protocol.ScoredEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/locationtech/geowave/datastore/redis/operations/BatchedRangeRead.class */
public class BatchedRangeRead<T> {
    private static final int MAX_CONCURRENT_READ = 100;
    private static final int MAX_BOUNDED_READS_ENQUEUED = 1000000;
    private final Collection<SinglePartitionQueryRanges> ranges;
    private final short adapterId;
    private final String setNamePrefix;
    private final RedissonClient client;
    private final GeoWaveRowIteratorTransformer<T> rowTransformer;
    private final Predicate<GeoWaveRow> filter;
    private final boolean async;
    private final Pair<Boolean, Boolean> groupByRowAndSortByTimePair;
    private final boolean isSortFinalResultsBySortKey;
    private final RedisOptions.Compression compression;
    private final boolean rowMerging;
    private final boolean visibilityEnabled;
    private static final Logger LOGGER = LoggerFactory.getLogger(BatchedRangeRead.class);
    private static ByteArray EMPTY_PARTITION_KEY = new ByteArray();
    private final LoadingCache<ByteArray, RedisScoredSetWrapper<GeoWaveRedisPersistedRow>> setCache = Caffeine.newBuilder().build(byteArray -> {
        return getSet(byteArray.getBytes());
    });
    private final Semaphore readSemaphore = new Semaphore(MAX_CONCURRENT_READ);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/locationtech/geowave/datastore/redis/operations/BatchedRangeRead$PartitionIteratorWrapper.class */
    public static class PartitionIteratorWrapper implements Iterator<ScoredEntry<GeoWaveRedisPersistedRow>> {
        private final byte[] partitionKey;
        private final Iterator<ScoredEntry<GeoWaveRedisPersistedRow>> iteratorDelegate;

        private PartitionIteratorWrapper(Iterator<ScoredEntry<GeoWaveRedisPersistedRow>> it, byte[] bArr) {
            this.partitionKey = bArr;
            this.iteratorDelegate = it;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.iteratorDelegate.hasNext();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public ScoredEntry<GeoWaveRedisPersistedRow> next() {
            ScoredEntry<GeoWaveRedisPersistedRow> next = this.iteratorDelegate.next();
            if (next != null) {
                ((GeoWaveRedisPersistedRow) next.getValue()).setPartitionKey(this.partitionKey);
            }
            return next;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/locationtech/geowave/datastore/redis/operations/BatchedRangeRead$ScoreOrderComparator.class */
    public static class ScoreOrderComparator implements Comparator<RangeReadInfo>, Serializable {
        private static final long serialVersionUID = 1;
        private static final ScoreOrderComparator SINGLETON = new ScoreOrderComparator();

        private ScoreOrderComparator() {
        }

        @Override // java.util.Comparator
        public int compare(RangeReadInfo rangeReadInfo, RangeReadInfo rangeReadInfo2) {
            int compare = Double.compare(rangeReadInfo.startScore, rangeReadInfo2.startScore);
            if (compare != 0) {
                return compare;
            }
            int compare2 = Double.compare(rangeReadInfo.endScore, rangeReadInfo2.endScore);
            if (compare2 != 0) {
                return compare2;
            }
            return UnsignedBytes.lexicographicalComparator().compare(rangeReadInfo.partitionKey == null ? new byte[0] : rangeReadInfo.partitionKey, rangeReadInfo2.partitionKey == null ? new byte[0] : rangeReadInfo2.partitionKey);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BatchedRangeRead(RedissonClient redissonClient, RedisOptions.Compression compression, String str, short s, Collection<SinglePartitionQueryRanges> collection, GeoWaveRowIteratorTransformer<T> geoWaveRowIteratorTransformer, Predicate<GeoWaveRow> predicate, boolean z, boolean z2, Pair<Boolean, Boolean> pair, boolean z3, boolean z4) {
        this.client = redissonClient;
        this.compression = compression;
        this.setNamePrefix = str;
        this.adapterId = s;
        this.ranges = collection;
        this.rowTransformer = geoWaveRowIteratorTransformer;
        this.filter = predicate;
        this.rowMerging = z;
        this.async = z2 && !z3;
        this.groupByRowAndSortByTimePair = pair;
        this.isSortFinalResultsBySortKey = z3;
        this.visibilityEnabled = z4;
    }

    private RedisScoredSetWrapper<GeoWaveRedisPersistedRow> getSet(byte[] bArr) {
        return RedisUtils.getRowSet(this.client, this.compression, this.setNamePrefix, bArr, ((Boolean) this.groupByRowAndSortByTimePair.getRight()).booleanValue(), this.visibilityEnabled);
    }

    public CloseableIterator<T> results() {
        ArrayList arrayList = new ArrayList();
        for (SinglePartitionQueryRanges singlePartitionQueryRanges : this.ranges) {
            arrayList.addAll((Collection) singlePartitionQueryRanges.getSortKeyRanges().stream().flatMap(byteArrayRange -> {
                return RedisUtils.getScoreRangesFromByteArrays(byteArrayRange).map(range -> {
                    return new RangeReadInfo(singlePartitionQueryRanges.getPartitionKey(), ((Double) range.getMinimum()).doubleValue(), ((Double) range.getMaximum()).doubleValue(), byteArrayRange);
                });
            }).collect(Collectors.toList()));
        }
        return this.async ? executeQueryAsync(arrayList) : executeQuery(arrayList);
    }

    private CloseableIterator<T> executeQuery(List<RangeReadInfo> list) {
        if (this.isSortFinalResultsBySortKey) {
            list.sort(ScoreOrderComparator.SINGLETON);
        }
        return new CloseableIterator.Wrapper(transformAndFilter(Iterators.concat(list.stream().map(rangeReadInfo -> {
            return new PartitionIteratorWrapper(Streams.stream(((RedisScoredSetWrapper) this.setCache.get((rangeReadInfo.partitionKey == null || rangeReadInfo.partitionKey.length == 0) ? EMPTY_PARTITION_KEY : new ByteArray(rangeReadInfo.partitionKey))).entryRange(rangeReadInfo.startScore, true, rangeReadInfo.endScore, rangeReadInfo.endScore <= rangeReadInfo.startScore || rangeReadInfo.explicitEndCheck != null)).filter(scoredEntry -> {
                return rangeReadInfo.passesExplicitRowChecks(scoredEntry);
            }).iterator(), rangeReadInfo.partitionKey);
        }).iterator())));
    }

    private CloseableIterator<T> executeQueryAsync(final List<RangeReadInfo> list) {
        final ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(list.size());
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(MAX_BOUNDED_READS_ENQUEUED);
        new Thread(new Runnable() { // from class: org.locationtech.geowave.datastore.redis.operations.BatchedRangeRead.1
            @Override // java.lang.Runnable
            public void run() {
                AtomicInteger atomicInteger = new AtomicInteger(1);
                for (RangeReadInfo rangeReadInfo : list) {
                    try {
                        ByteArray byteArray = (rangeReadInfo.partitionKey == null || rangeReadInfo.partitionKey.length == 0) ? BatchedRangeRead.EMPTY_PARTITION_KEY : new ByteArray(rangeReadInfo.partitionKey);
                        BatchedRangeRead.this.readSemaphore.acquire();
                        RFuture entryRangeAsync = ((RedisScoredSetWrapper) BatchedRangeRead.this.setCache.get(byteArray)).entryRangeAsync(rangeReadInfo.startScore, true, rangeReadInfo.endScore, rangeReadInfo.endScore <= rangeReadInfo.startScore || rangeReadInfo.explicitEndCheck != null);
                        atomicInteger.incrementAndGet();
                        BlockingQueue blockingQueue = linkedBlockingQueue;
                        entryRangeAsync.handle((collection, th) -> {
                            if (!entryRangeAsync.isSuccess()) {
                                if (!entryRangeAsync.isCancelled()) {
                                    BatchedRangeRead.LOGGER.warn("Async Redis query failed", th);
                                }
                                return collection;
                            }
                            try {
                                collection.forEach(scoredEntry -> {
                                    ((GeoWaveRedisPersistedRow) scoredEntry.getValue()).setPartitionKey(rangeReadInfo.partitionKey);
                                });
                                BatchedRangeRead.this.transformAndFilter(collection.stream().filter(scoredEntry2 -> {
                                    return rangeReadInfo.passesExplicitRowChecks(scoredEntry2);
                                }).iterator()).forEachRemaining(obj -> {
                                    try {
                                        blockingQueue.put(obj);
                                    } catch (InterruptedException e) {
                                        BatchedRangeRead.LOGGER.warn("interrupted while waiting to enqueue a redis result", e);
                                    }
                                });
                                BatchedRangeRead.checkFinalize(BatchedRangeRead.this.readSemaphore, blockingQueue, atomicInteger);
                                return collection;
                            } finally {
                                BatchedRangeRead.checkFinalize(BatchedRangeRead.this.readSemaphore, blockingQueue, atomicInteger);
                            }
                        });
                        synchronized (newArrayListWithExpectedSize) {
                            newArrayListWithExpectedSize.add(entryRangeAsync);
                        }
                    } catch (InterruptedException e) {
                        BatchedRangeRead.LOGGER.warn("Exception while executing query", e);
                        BatchedRangeRead.this.readSemaphore.release();
                    }
                }
                if (atomicInteger.decrementAndGet() <= 0) {
                    try {
                        linkedBlockingQueue.put(RowConsumer.POISON);
                    } catch (InterruptedException e2) {
                        BatchedRangeRead.LOGGER.error("Interrupted while finishing blocking queue, this may result in deadlock!");
                    }
                }
            }
        }, "Redis Query Executor").start();
        return new CloseableIteratorWrapper(new Closeable() { // from class: org.locationtech.geowave.datastore.redis.operations.BatchedRangeRead.2
            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
                ArrayList arrayList;
                synchronized (newArrayListWithExpectedSize) {
                    arrayList = new ArrayList(newArrayListWithExpectedSize);
                }
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    ((RFuture) it.next()).cancel(true);
                }
            }
        }, new RowConsumer(linkedBlockingQueue));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Iterator<T> transformAndFilter(Iterator<ScoredEntry<GeoWaveRedisPersistedRow>> it) {
        GeoWaveRowMergingIterator it2 = Streams.stream(((Boolean) this.groupByRowAndSortByTimePair.getLeft()).booleanValue() ? RedisUtils.groupByRow(it, ((Boolean) this.groupByRowAndSortByTimePair.getRight()).booleanValue()) : it).map(scoredEntry -> {
            return new GeoWaveRedisRow((GeoWaveRedisPersistedRow) scoredEntry.getValue(), this.adapterId, ((GeoWaveRedisPersistedRow) scoredEntry.getValue()).getPartitionKey(), RedisUtils.getFullSortKey(scoredEntry.getScore().doubleValue(), ((GeoWaveRedisPersistedRow) scoredEntry.getValue()).getSortKeyPrecisionBeyondScore()));
        }).filter(this.filter).iterator();
        return (Iterator) this.rowTransformer.apply(sortByKeyIfRequired(this.isSortFinalResultsBySortKey, this.rowMerging ? new GeoWaveRowMergingIterator(it2) : it2));
    }

    private static Iterator<GeoWaveRow> sortByKeyIfRequired(boolean z, Iterator<GeoWaveRow> it) {
        return z ? RedisUtils.sortBySortKey(it) : it;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void checkFinalize(Semaphore semaphore, BlockingQueue<Object> blockingQueue, AtomicInteger atomicInteger) {
        semaphore.release();
        if (atomicInteger.decrementAndGet() <= 0) {
            try {
                blockingQueue.put(RowConsumer.POISON);
            } catch (InterruptedException e) {
                LOGGER.error("Interrupted while finishing blocking queue, this may result in deadlock!");
            }
        }
    }
}
