package org.bboxdb.network.server;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.bboxdb.commons.math.Hyperrectangle;
import org.bboxdb.distribution.partitioner.SpacePartitionerCache;
import org.bboxdb.distribution.partitioner.regionsplit.RangeQueryExecutor;
import org.bboxdb.distribution.zookeeper.ZookeeperException;
import org.bboxdb.misc.BBoxDBConfiguration;
import org.bboxdb.misc.BBoxDBConfigurationManager;
import org.bboxdb.misc.BBoxDBException;
import org.bboxdb.network.packages.PackageEncodeException;
import org.bboxdb.network.packages.response.ErrorResponse;
import org.bboxdb.network.packages.response.MultipleTupleEndResponse;
import org.bboxdb.network.packages.response.MultipleTupleStartResponse;
import org.bboxdb.network.packages.response.PageEndResponse;
import org.bboxdb.network.query.ContinuousQueryPlan;
import org.bboxdb.network.query.ContinuousRangeQueryPlan;
import org.bboxdb.network.query.ContinuousSpatialJoinQueryPlan;
import org.bboxdb.network.query.entity.TupleAndBoundingBox;
import org.bboxdb.network.query.filter.UserDefinedFilter;
import org.bboxdb.network.query.filter.UserDefinedFilterDefinition;
import org.bboxdb.network.query.transformation.TupleTransformation;
import org.bboxdb.network.server.connection.ClientConnectionHandler;
import org.bboxdb.storage.StorageManagerException;
import org.bboxdb.storage.entity.DeletedTuple;
import org.bboxdb.storage.entity.MultiTuple;
import org.bboxdb.storage.entity.Tuple;
import org.bboxdb.storage.entity.TupleStoreName;
import org.bboxdb.storage.tuplestore.manager.TupleStoreManager;
import org.bboxdb.storage.tuplestore.manager.TupleStoreManagerRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/bboxdb/network/server/ContinuousClientQuery.class */
public class ContinuousClientQuery implements ClientQuery {
    private final Hyperrectangle boundingBox;
    private final ClientConnectionHandler clientConnectionHandler;
    private final short querySequence;
    private final TupleStoreName requestTable;
    private long totalSendTuples;
    private long tuplesInPage;
    private boolean queryActive;
    private final BlockingQueue<MultiTuple> tupleQueue;
    private long lastFlushTime = System.currentTimeMillis();
    private final Consumer<Tuple> tupleInsertCallback;
    private final List<TupleStoreManager> storageManager;
    private final ContinuousQueryPlan queryPlan;
    private final boolean allowDiscardTuples;
    private static final long FLUSH_TIME_IN_MS = TimeUnit.SECONDS.toMillis(1);
    private static final MultiTuple RED_PILL = new MultiTuple(new ArrayList(), new ArrayList());
    private static final MultiTuple KEEP_ALIVE_PILL = new MultiTuple(new ArrayList(), new ArrayList());
    private static final Logger logger = LoggerFactory.getLogger(ContinuousClientQuery.class);

    public ContinuousClientQuery(ContinuousQueryPlan continuousQueryPlan, ClientConnectionHandler clientConnectionHandler, short s) {
        this.queryActive = true;
        this.queryPlan = continuousQueryPlan;
        this.boundingBox = continuousQueryPlan.getQueryRange();
        this.requestTable = new TupleStoreName(continuousQueryPlan.getStreamTable());
        this.clientConnectionHandler = clientConnectionHandler;
        this.querySequence = s;
        BBoxDBConfiguration configuration = BBoxDBConfigurationManager.getConfiguration();
        int continuousClientQueueSize = configuration.getContinuousClientQueueSize();
        this.allowDiscardTuples = configuration.isAllowContinuousClientQueueDiscard();
        this.tupleQueue = new LinkedBlockingQueue(continuousClientQueueSize);
        this.totalSendTuples = 0L;
        this.tuplesInPage = 0L;
        this.storageManager = new ArrayList();
        if (continuousQueryPlan instanceof ContinuousRangeQueryPlan) {
            this.tupleInsertCallback = getCallbackForRangeQuery((ContinuousRangeQueryPlan) continuousQueryPlan);
        } else {
            if (!(continuousQueryPlan instanceof ContinuousSpatialJoinQueryPlan)) {
                this.tupleInsertCallback = null;
                logger.error("Unknown query type: " + continuousQueryPlan);
                this.queryActive = false;
                return;
            }
            this.tupleInsertCallback = getCallbackForSpatialJoinQuery((ContinuousSpatialJoinQueryPlan) continuousQueryPlan);
        }
        try {
            init();
        } catch (BBoxDBException e) {
            logger.error("Got exception on init", e);
            this.queryActive = false;
        }
    }

    private Consumer<Tuple> getCallbackForSpatialJoinQuery(ContinuousSpatialJoinQueryPlan continuousSpatialJoinQueryPlan) {
        List<TupleTransformation> streamTransformation = continuousSpatialJoinQueryPlan.getStreamTransformation();
        Map<UserDefinedFilter, byte[]> userDefinedFilter = getUserDefinedFilter(continuousSpatialJoinQueryPlan.getStreamFilters());
        Map<UserDefinedFilter, byte[]> userDefinedFilter2 = getUserDefinedFilter(continuousSpatialJoinQueryPlan.getAfterJoinFilter());
        return tuple -> {
            if (tuple instanceof DeletedTuple) {
                return;
            }
            TupleStoreName tupleStoreName = new TupleStoreName(continuousSpatialJoinQueryPlan.getJoinTable());
            TupleAndBoundingBox applyStreamTupleTransformations = applyStreamTupleTransformations(streamTransformation, tuple);
            if (applyStreamTupleTransformations != null && applyStreamTupleTransformations.getBoundingBox().intersects(continuousSpatialJoinQueryPlan.getQueryRange()) && doUserDefinedFilterMatch(tuple, userDefinedFilter)) {
                Consumer<Tuple> storedTupleReader = getStoredTupleReader(continuousSpatialJoinQueryPlan, userDefinedFilter2, tuple, applyStreamTupleTransformations);
                try {
                    TupleStoreManagerRegistry storageRegistry = this.clientConnectionHandler.getStorageRegistry();
                    BBoxDBConfiguration.ContinuousSpatialJoinFetchMode continuousSpatialJoinFetchModeENUM = BBoxDBConfigurationManager.getConfiguration().getContinuousSpatialJoinFetchModeENUM();
                    RangeQueryExecutor.ExecutionPolicy executionPolicy = RangeQueryExecutor.ExecutionPolicy.LOCAL_ONLY;
                    if (continuousSpatialJoinFetchModeENUM == BBoxDBConfiguration.ContinuousSpatialJoinFetchMode.FETCH) {
                        executionPolicy = RangeQueryExecutor.ExecutionPolicy.ALL;
                    }
                    new RangeQueryExecutor(tupleStoreName, applyStreamTupleTransformations.getBoundingBox(), storedTupleReader, storageRegistry, executionPolicy).performDataRead();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    this.queryActive = false;
                } catch (BBoxDBException e2) {
                    logger.error("Got an exeeption while quering tuples", e2);
                    this.queryActive = false;
                }
            }
        };
    }

    private Consumer<Tuple> getStoredTupleReader(ContinuousSpatialJoinQueryPlan continuousSpatialJoinQueryPlan, Map<UserDefinedFilter, byte[]> map, Tuple tuple, TupleAndBoundingBox tupleAndBoundingBox) {
        return tuple2 -> {
            TupleAndBoundingBox applyStreamTupleTransformations = applyStreamTupleTransformations(continuousSpatialJoinQueryPlan.getTableTransformation(), tuple2);
            if (applyStreamTupleTransformations == null) {
                logger.error("Transformed tuple is null, please check filter");
            } else if (tupleAndBoundingBox.getBoundingBox().intersects(applyStreamTupleTransformations.getBoundingBox()) && doUserDefinedFilterMatch(tuple, tuple2, map)) {
                queueTupleForClientProcessing(buildJoinedTuple(continuousSpatialJoinQueryPlan, tuple, tuple2));
            }
        };
    }

    private MultiTuple buildJoinedTuple(ContinuousSpatialJoinQueryPlan continuousSpatialJoinQueryPlan, Tuple tuple, Tuple tuple2) {
        return new MultiTuple((List<Tuple>) Arrays.asList(tuple, tuple2), (List<String>) Arrays.asList(continuousSpatialJoinQueryPlan.getStreamTable(), continuousSpatialJoinQueryPlan.getJoinTable()));
    }

    private boolean doUserDefinedFilterMatch(Tuple tuple, Map<UserDefinedFilter, byte[]> map) {
        for (Map.Entry<UserDefinedFilter, byte[]> entry : map.entrySet()) {
            if (!entry.getKey().filterTuple(tuple, entry.getValue())) {
                return false;
            }
        }
        return true;
    }

    private boolean doUserDefinedFilterMatch(Tuple tuple, Tuple tuple2, Map<UserDefinedFilter, byte[]> map) {
        for (Map.Entry<UserDefinedFilter, byte[]> entry : map.entrySet()) {
            if (!entry.getKey().filterJoinCandidate(tuple, tuple2, entry.getValue())) {
                return false;
            }
        }
        return true;
    }

    private Map<UserDefinedFilter, byte[]> getUserDefinedFilter(List<UserDefinedFilterDefinition> list) {
        HashMap hashMap = new HashMap();
        for (UserDefinedFilterDefinition userDefinedFilterDefinition : list) {
            try {
                hashMap.put((UserDefinedFilter) Class.forName(userDefinedFilterDefinition.getUserDefinedFilterClass()).newInstance(), userDefinedFilterDefinition.getUserDefinedFilterValue().getBytes());
            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
                throw new IllegalArgumentException("Unable to find user defined filter class", e);
            }
        }
        return hashMap;
    }

    private Consumer<Tuple> getCallbackForRangeQuery(ContinuousRangeQueryPlan continuousRangeQueryPlan) {
        ContinuousRangeQueryPlan continuousRangeQueryPlan2 = (ContinuousRangeQueryPlan) this.queryPlan;
        Map<UserDefinedFilter, byte[]> userDefinedFilter = getUserDefinedFilter(continuousRangeQueryPlan.getStreamFilters());
        return tuple -> {
            TupleAndBoundingBox applyStreamTupleTransformations;
            if ((tuple instanceof DeletedTuple) || (applyStreamTupleTransformations = applyStreamTupleTransformations(continuousRangeQueryPlan2.getStreamTransformation(), tuple)) == null || !doUserDefinedFilterMatch(tuple, userDefinedFilter)) {
                return;
            }
            if (applyStreamTupleTransformations.getBoundingBox().intersects(continuousRangeQueryPlan2.getCompareRectangle())) {
                if (continuousRangeQueryPlan2.isReportPositive()) {
                    queueTupleForClientProcessing(new MultiTuple(tuple, this.requestTable.getFullname()));
                }
            } else {
                if (continuousRangeQueryPlan2.isReportPositive()) {
                    return;
                }
                queueTupleForClientProcessing(new MultiTuple(tuple, this.requestTable.getFullname()));
            }
        };
    }

    private TupleAndBoundingBox applyStreamTupleTransformations(List<TupleTransformation> list, Tuple tuple) {
        TupleAndBoundingBox tupleAndBoundingBox = new TupleAndBoundingBox(tuple, tuple.getBoundingBox());
        Iterator<TupleTransformation> it = list.iterator();
        while (it.hasNext()) {
            tupleAndBoundingBox = it.next().apply(tupleAndBoundingBox);
            if (tupleAndBoundingBox == null) {
                break;
            }
        }
        return tupleAndBoundingBox;
    }

    private void queueTupleForClientProcessing(MultiTuple multiTuple) {
        if (this.allowDiscardTuples) {
            if (this.tupleQueue.offer(multiTuple)) {
                return;
            }
            logger.error("Unable to add tuple to continuous query, queue is full (seq={} / size={})", Short.valueOf(this.querySequence), Integer.valueOf(this.tupleQueue.size()));
        } else {
            try {
                this.tupleQueue.put(multiTuple);
            } catch (InterruptedException e) {
                logger.debug("Wait was interrupted", e);
                Thread.currentThread().interrupt();
            }
        }
    }

    private void init() throws BBoxDBException {
        try {
            logger.info("Starting new continuous client query (seq={})", Short.valueOf(this.querySequence));
            TupleStoreManagerRegistry storageRegistry = this.clientConnectionHandler.getStorageRegistry();
            Iterator<TupleStoreName> it = SpacePartitionerCache.getInstance().getSpacePartitionerForGroupName(this.requestTable.getDistributionGroup()).getDistributionRegionIdMapper().getLocalTablesForRegion(this.boundingBox, this.requestTable).iterator();
            while (it.hasNext()) {
                TupleStoreManager tupleStoreManager = QueryHelper.getTupleStoreManager(storageRegistry, it.next());
                tupleStoreManager.registerInsertCallback(this.tupleInsertCallback);
                this.storageManager.add(tupleStoreManager);
            }
            this.clientConnectionHandler.addConnectionClosedHandler(clientConnectionHandler -> {
                close();
            });
        } catch (ZookeeperException | StorageManagerException e) {
            logger.error("Got an exception during query init", e);
            close();
        }
    }

    @Override // org.bboxdb.network.server.ClientQuery
    public void fetchAndSendNextTuples(short s) throws IOException, PackageEncodeException {
        this.clientConnectionHandler.writeResultPackage(new MultipleTupleStartResponse(s));
        while (this.queryActive) {
            try {
                if (System.currentTimeMillis() >= this.lastFlushTime + FLUSH_TIME_IN_MS && this.tuplesInPage > 0) {
                    logger.debug("Flushing page for continous query {}, old time {}, cur time {}, send tuples {}", new Object[]{Short.valueOf(s), Long.valueOf(this.lastFlushTime), Long.valueOf(System.currentTimeMillis()), Long.valueOf(this.totalSendTuples)});
                    this.clientConnectionHandler.writeResultPackage(new PageEndResponse(s));
                    this.clientConnectionHandler.flushPendingCompressionPackages();
                    this.lastFlushTime = System.currentTimeMillis();
                    this.tuplesInPage = 0L;
                    return;
                }
                MultiTuple take = this.tupleQueue.take();
                if (take == RED_PILL) {
                    logger.info("Got the red pill from the queue, cancel query");
                    this.clientConnectionHandler.writeResultPackage(new MultipleTupleEndResponse(s));
                    this.clientConnectionHandler.flushPendingCompressionPackages();
                    close();
                    return;
                }
                if (take != KEEP_ALIVE_PILL) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Adding tuple to query {}", Short.valueOf(s));
                    }
                    this.clientConnectionHandler.writeResultTuple(s, take, true);
                    this.totalSendTuples++;
                    this.tuplesInPage++;
                }
            } catch (InterruptedException e) {
                logger.info("Thread was interrupted while waiting for new tuples");
                this.queryActive = false;
                this.clientConnectionHandler.writeResultPackage(new ErrorResponse(s));
                this.clientConnectionHandler.flushPendingCompressionPackages();
                return;
            }
        }
        this.clientConnectionHandler.writeResultPackage(new MultipleTupleEndResponse(s));
        this.clientConnectionHandler.flushPendingCompressionPackages();
    }

    @Override // org.bboxdb.network.server.ClientQuery
    public void maintenanceCallback() {
        if (this.tupleQueue.isEmpty()) {
            this.tupleQueue.offer(KEEP_ALIVE_PILL);
        }
    }

    @Override // org.bboxdb.network.server.ClientQuery
    public boolean isQueryDone() {
        return !this.queryActive;
    }

    @Override // org.bboxdb.network.server.ClientQuery, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.queryActive) {
            logger.info("Closing query {} (send {} result tuples)", Short.valueOf(this.querySequence), Long.valueOf(this.totalSendTuples));
            Iterator<TupleStoreManager> it = this.storageManager.iterator();
            while (it.hasNext()) {
                if (!it.next().removeInsertCallback(this.tupleInsertCallback)) {
                    logger.error("Unable to remove insert callback, got bad remove callback");
                }
            }
            if (this.storageManager.isEmpty()) {
                logger.error("Unable to remove insert callback, storage manager is NULL");
            }
            this.queryActive = false;
            this.tupleQueue.offer(RED_PILL);
        }
    }

    @Override // org.bboxdb.network.server.ClientQuery
    public long getTotalSendTuples() {
        return this.totalSendTuples;
    }

    public ContinuousQueryPlan getQueryPlan() {
        return this.queryPlan;
    }
}
