/*
 * Decompiled with CFR 0.152.
 */
package org.locationtech.geowave.datastore.cassandra.operations;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TypeCodec;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.apache.commons.lang3.ArrayUtils;
import org.locationtech.geowave.core.index.ByteArrayRange;
import org.locationtech.geowave.core.index.SinglePartitionQueryRanges;
import org.locationtech.geowave.core.store.CloseableIterator;
import org.locationtech.geowave.core.store.CloseableIteratorWrapper;
import org.locationtech.geowave.core.store.entities.GeoWaveRow;
import org.locationtech.geowave.core.store.entities.GeoWaveRowIteratorTransformer;
import org.locationtech.geowave.core.store.entities.GeoWaveRowMergingIterator;
import org.locationtech.geowave.core.store.util.RowConsumer;
import org.locationtech.geowave.datastore.cassandra.CassandraRow;
import org.locationtech.geowave.datastore.cassandra.operations.CassandraOperations;
import org.locationtech.geowave.datastore.cassandra.util.CassandraUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchedRangeRead<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BatchedRangeRead.class);
    private static final int MAX_CONCURRENT_READ = 100;
    private static final int MAX_BOUNDED_READS_ENQUEUED = 1000000;
    private final CassandraOperations operations;
    private final PreparedStatement preparedRead;
    private final Collection<SinglePartitionQueryRanges> ranges;
    private final short[] adapterIds;
    private final GeoWaveRowIteratorTransformer<T> rowTransformer;
    private final boolean rowMerging;
    Predicate<GeoWaveRow> filter;
    private final Semaphore readSemaphore = new Semaphore(100);

    protected BatchedRangeRead(PreparedStatement preparedRead, CassandraOperations operations, short[] adapterIds, Collection<SinglePartitionQueryRanges> ranges, boolean rowMerging, GeoWaveRowIteratorTransformer<T> rowTransformer, Predicate<GeoWaveRow> filter) {
        this.preparedRead = preparedRead;
        this.operations = operations;
        this.adapterIds = adapterIds;
        this.ranges = ranges;
        this.rowMerging = rowMerging;
        this.rowTransformer = rowTransformer;
        this.filter = filter;
    }

    public CloseableIterator<T> results() {
        ArrayList<BoundStatement> statements = new ArrayList<BoundStatement>();
        for (SinglePartitionQueryRanges r : this.ranges) {
            byte[] partitionKey = r.getPartitionKey() == null || r.getPartitionKey().length == 0 ? CassandraUtils.EMPTY_PARTITION_KEY : r.getPartitionKey();
            for (ByteArrayRange range : r.getSortKeyRanges()) {
                byte[] byArray;
                byte[] start;
                BoundStatement boundRead = new BoundStatement(this.preparedRead);
                byte[] byArray2 = start = range.getStart() != null ? range.getStart() : new byte[]{};
                if (range.getEnd() != null) {
                    byArray = range.getEndAsNextPrefix();
                } else {
                    byte[] byArray3 = new byte[7];
                    byArray3[0] = -1;
                    byArray3[1] = -1;
                    byArray3[2] = -1;
                    byArray3[3] = -1;
                    byArray3[4] = -1;
                    byArray3[5] = -1;
                    byArray = byArray3;
                    byArray3[6] = -1;
                }
                byte[] end = byArray;
                boundRead.set(CassandraRow.CassandraField.GW_SORT_KEY.getLowerBoundBindMarkerName(), (Object)ByteBuffer.wrap(start), ByteBuffer.class);
                boundRead.set(CassandraRow.CassandraField.GW_SORT_KEY.getUpperBoundBindMarkerName(), (Object)ByteBuffer.wrap(end), ByteBuffer.class);
                boundRead.set(CassandraRow.CassandraField.GW_PARTITION_ID_KEY.getBindMarkerName(), (Object)ByteBuffer.wrap(partitionKey), ByteBuffer.class);
                boundRead.set(CassandraRow.CassandraField.GW_ADAPTER_ID_KEY.getBindMarkerName(), Arrays.asList(ArrayUtils.toObject((short[])this.adapterIds)), TypeCodec.list((TypeCodec)TypeCodec.smallInt()));
                statements.add(boundRead);
            }
        }
        return this.executeQueryAsync((Statement[])statements.toArray(new BoundStatement[0]));
    }

    public CloseableIterator<T> executeQueryAsync(final Statement ... statements) {
        final ArrayList futures = Lists.newArrayListWithExpectedSize((int)statements.length);
        final LinkedBlockingQueue results = new LinkedBlockingQueue(1000000);
        new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                AtomicInteger queryCount = new AtomicInteger(1);
                for (Statement s : statements) {
                    try {
                        BatchedRangeRead.this.readSemaphore.acquire();
                        ResultSetFuture f = BatchedRangeRead.this.operations.getSession().executeAsync(s);
                        List list = futures;
                        synchronized (list) {
                            futures.add(f);
                        }
                        Futures.addCallback((ListenableFuture)f, new QueryCallback(queryCount, results, BatchedRangeRead.this.rowTransformer, BatchedRangeRead.this.filter, BatchedRangeRead.this.rowMerging, BatchedRangeRead.this.readSemaphore), (Executor)CassandraOperations.READ_RESPONSE_THREADS);
                    }
                    catch (InterruptedException e) {
                        LOGGER.warn("Exception while executing query", (Throwable)e);
                        BatchedRangeRead.this.readSemaphore.release();
                    }
                }
                if (queryCount.decrementAndGet() <= 0) {
                    try {
                        results.put(RowConsumer.POISON);
                    }
                    catch (InterruptedException e) {
                        LOGGER.error("Interrupted while finishing blocking queue, this may result in deadlock!");
                    }
                }
            }
        }, "Cassandra Query Executor").start();
        return new CloseableIteratorWrapper(new Closeable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void close() throws IOException {
                List list = futures;
                synchronized (list) {
                    for (ResultSetFuture f : futures) {
                        f.cancel(true);
                    }
                }
            }
        }, (Iterator)new RowConsumer(results));
    }

    protected static class QueryCallback<T>
    implements FutureCallback<ResultSet> {
        private final Semaphore semaphore;
        private final BlockingQueue<Object> resultQueue;
        private final AtomicInteger queryCount;
        private final boolean rowMerging;
        private final GeoWaveRowIteratorTransformer<T> rowTransform;
        Predicate<GeoWaveRow> filter;

        public QueryCallback(AtomicInteger queryCount, BlockingQueue<Object> resultQueue, GeoWaveRowIteratorTransformer<T> rowTransform, Predicate<GeoWaveRow> filter, boolean rowMerging, Semaphore semaphore) {
            this.queryCount = queryCount;
            this.queryCount.incrementAndGet();
            this.resultQueue = resultQueue;
            this.rowTransform = rowTransform;
            this.filter = filter;
            this.rowMerging = rowMerging;
            this.semaphore = semaphore;
        }

        public void onSuccess(ResultSet result) {
            try {
                if (result != null) {
                    GeoWaveRowMergingIterator iterator = Streams.stream((Iterator)result.iterator()).map(row -> new CassandraRow((Row)row)).filter(this.filter).iterator();
                    ((Iterator)this.rowTransform.apply((Object)(this.rowMerging ? new GeoWaveRowMergingIterator((Iterator)iterator) : iterator))).forEachRemaining(row -> {
                        try {
                            this.resultQueue.put(row);
                        }
                        catch (InterruptedException e) {
                            LOGGER.warn("interrupted while waiting to enqueue a cassandra result", (Throwable)e);
                        }
                    });
                }
            }
            finally {
                this.checkFinalize();
            }
        }

        public void onFailure(Throwable t) {
            this.checkFinalize();
            if (!(t instanceof CancellationException)) {
                LOGGER.error("Failure from async query", t);
                throw new RuntimeException(t);
            }
        }

        private void checkFinalize() {
            this.semaphore.release();
            if (this.queryCount.decrementAndGet() <= 0) {
                try {
                    this.resultQueue.put(RowConsumer.POISON);
                }
                catch (InterruptedException e) {
                    LOGGER.error("Interrupted while finishing blocking queue, this may result in deadlock!");
                }
            }
        }
    }
}

