package org.bboxdb.network.server;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.bboxdb.commons.math.Hyperrectangle;
import org.bboxdb.distribution.partitioner.SpacePartitionerCache;
import org.bboxdb.distribution.partitioner.regionsplit.RangeQueryExecutor;
import org.bboxdb.distribution.zookeeper.ZookeeperException;
import org.bboxdb.misc.BBoxDBException;
import org.bboxdb.network.packages.PackageEncodeException;
import org.bboxdb.network.packages.response.ErrorResponse;
import org.bboxdb.network.packages.response.MultipleTupleEndResponse;
import org.bboxdb.network.packages.response.MultipleTupleStartResponse;
import org.bboxdb.network.packages.response.PageEndResponse;
import org.bboxdb.network.query.ContinuousConstQueryPlan;
import org.bboxdb.network.query.ContinuousQueryPlan;
import org.bboxdb.network.query.ContinuousTableQueryPlan;
import org.bboxdb.network.query.entity.TupleAndBoundingBox;
import org.bboxdb.network.query.filter.UserDefinedFilter;
import org.bboxdb.network.query.filter.UserDefinedFilterDefinition;
import org.bboxdb.network.query.transformation.TupleTransformation;
import org.bboxdb.network.server.connection.ClientConnectionHandler;
import org.bboxdb.storage.StorageManagerException;
import org.bboxdb.storage.entity.JoinedTuple;
import org.bboxdb.storage.entity.Tuple;
import org.bboxdb.storage.entity.TupleStoreName;
import org.bboxdb.storage.tuplestore.manager.TupleStoreManager;
import org.bboxdb.storage.tuplestore.manager.TupleStoreManagerRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/bboxdb/network/server/ContinuousClientQuery.class */
public class ContinuousClientQuery implements ClientQuery {
    private final Hyperrectangle boundingBox;
    private final ClientConnectionHandler clientConnectionHandler;
    private final short querySequence;
    private final TupleStoreName requestTable;
    private boolean queryActive;
    private static final int MAX_QUEUE_CAPACITY = 1024;
    private final Consumer<Tuple> tupleInsertCallback;
    private TupleStoreManager storageManager;
    private final ContinuousQueryPlan queryPlan;
    private static final long FLUSH_TIME_IN_MS = TimeUnit.SECONDS.toMillis(1);
    private static final JoinedTuple RED_PILL = new JoinedTuple(new ArrayList(), new ArrayList());
    private static final JoinedTuple KEEP_ALIVE_PILL = new JoinedTuple(new ArrayList(), new ArrayList());
    private static final Logger logger = LoggerFactory.getLogger(ContinuousClientQuery.class);
    private long lastFlushTime = System.currentTimeMillis();
    private final BlockingQueue<JoinedTuple> tupleQueue = new ArrayBlockingQueue(MAX_QUEUE_CAPACITY);
    private long totalSendTuples = 0;

    public ContinuousClientQuery(ContinuousQueryPlan continuousQueryPlan, ClientConnectionHandler clientConnectionHandler, short s) {
        this.queryActive = true;
        this.queryPlan = continuousQueryPlan;
        this.boundingBox = continuousQueryPlan.getQueryRange();
        this.requestTable = new TupleStoreName(continuousQueryPlan.getStreamTable());
        this.clientConnectionHandler = clientConnectionHandler;
        this.querySequence = s;
        if (continuousQueryPlan instanceof ContinuousConstQueryPlan) {
            this.tupleInsertCallback = getCallbackForConstQuery();
        } else {
            if (!(continuousQueryPlan instanceof ContinuousTableQueryPlan)) {
                this.tupleInsertCallback = null;
                logger.error("Unknown query type: " + continuousQueryPlan);
                this.queryActive = false;
                return;
            }
            this.tupleInsertCallback = getCallbackForTableQuery();
        }
        try {
            init();
        } catch (BBoxDBException e) {
            logger.error("Got exception on init", e);
            this.queryActive = false;
        }
    }

    private Consumer<Tuple> getCallbackForTableQuery() {
        ContinuousTableQueryPlan continuousTableQueryPlan = (ContinuousTableQueryPlan) this.queryPlan;
        return tuple -> {
            TupleAndBoundingBox applyStreamTupleTransformations = applyStreamTupleTransformations(continuousTableQueryPlan.getStreamTransformation(), tuple);
            if (applyStreamTupleTransformations == null) {
                return;
            }
            Map<UserDefinedFilter, byte[]> userDefinedFilter = getUserDefinedFilter(continuousTableQueryPlan);
            try {
                new RangeQueryExecutor(new TupleStoreName(continuousTableQueryPlan.getJoinTable()), applyStreamTupleTransformations.getBoundingBox(), tuple -> {
                    TupleAndBoundingBox applyStreamTupleTransformations2 = applyStreamTupleTransformations(continuousTableQueryPlan.getTableTransformation(), tuple);
                    if (applyStreamTupleTransformations2 == null) {
                        logger.error("Transformed tuple is null, please check filter");
                        return;
                    }
                    boolean doUserDefinedFilterMatch = doUserDefinedFilterMatch(tuple, userDefinedFilter, applyStreamTupleTransformations2);
                    if (applyStreamTupleTransformations.getBoundingBox().intersects(applyStreamTupleTransformations2.getTuple().getBoundingBox())) {
                        if (this.queryPlan.isReportPositive() && doUserDefinedFilterMatch) {
                            queueTupleForClientProcessing(new JoinedTuple((List<Tuple>) Arrays.asList(tuple, applyStreamTupleTransformations2.getTuple()), (List<String>) Arrays.asList(this.requestTable.getFullname(), this.requestTable.getFullname())));
                            return;
                        }
                        return;
                    }
                    if (this.queryPlan.isReportPositive() || doUserDefinedFilterMatch) {
                        return;
                    }
                    queueTupleForClientProcessing(new JoinedTuple((List<Tuple>) Arrays.asList(tuple, applyStreamTupleTransformations2.getTuple()), (List<String>) Arrays.asList(this.requestTable.getFullname(), this.requestTable.getFullname())));
                }, this.clientConnectionHandler.getStorageRegistry(), RangeQueryExecutor.ExecutionPolicy.ALL).performDataRead();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.queryActive = false;
            } catch (BBoxDBException e2) {
                logger.error("Got an exeeption while quering tuples", e2);
                this.queryActive = false;
            }
        };
    }

    private boolean doUserDefinedFilterMatch(Tuple tuple, Map<UserDefinedFilter, byte[]> map, TupleAndBoundingBox tupleAndBoundingBox) {
        boolean z = true;
        for (Map.Entry<UserDefinedFilter, byte[]> entry : map.entrySet()) {
            if (!entry.getKey().filterJoinCandidate(tuple, tupleAndBoundingBox.getTuple(), entry.getValue())) {
                z = false;
            }
        }
        return z;
    }

    private Map<UserDefinedFilter, byte[]> getUserDefinedFilter(ContinuousTableQueryPlan continuousTableQueryPlan) {
        HashMap hashMap = new HashMap();
        for (UserDefinedFilterDefinition userDefinedFilterDefinition : continuousTableQueryPlan.getAfterJoinFilter()) {
            try {
                hashMap.put((UserDefinedFilter) Class.forName(userDefinedFilterDefinition.getUserDefinedFilterClass()).newInstance(), userDefinedFilterDefinition.getUserDefinedFilterValue().getBytes());
            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
                throw new IllegalArgumentException("Unable to find user defined filter class", e);
            }
        }
        return hashMap;
    }

    private Consumer<Tuple> getCallbackForConstQuery() {
        ContinuousConstQueryPlan continuousConstQueryPlan = (ContinuousConstQueryPlan) this.queryPlan;
        return tuple -> {
            TupleAndBoundingBox applyStreamTupleTransformations = applyStreamTupleTransformations(continuousConstQueryPlan.getStreamTransformation(), tuple);
            if (applyStreamTupleTransformations == null) {
                return;
            }
            if (applyStreamTupleTransformations.getBoundingBox().intersects(continuousConstQueryPlan.getCompareRectangle())) {
                if (this.queryPlan.isReportPositive()) {
                    queueTupleForClientProcessing(new JoinedTuple(tuple, this.requestTable.getFullname()));
                }
            } else {
                if (this.queryPlan.isReportPositive()) {
                    return;
                }
                queueTupleForClientProcessing(new JoinedTuple(tuple, this.requestTable.getFullname()));
            }
        };
    }

    private TupleAndBoundingBox applyStreamTupleTransformations(List<TupleTransformation> list, Tuple tuple) {
        TupleAndBoundingBox tupleAndBoundingBox = new TupleAndBoundingBox(tuple, tuple.getBoundingBox());
        Iterator<TupleTransformation> it = list.iterator();
        while (it.hasNext()) {
            tupleAndBoundingBox = it.next().apply(tupleAndBoundingBox);
            if (tupleAndBoundingBox == null) {
                break;
            }
        }
        return tupleAndBoundingBox;
    }

    private void queueTupleForClientProcessing(JoinedTuple joinedTuple) {
        if (this.tupleQueue.offer(joinedTuple)) {
            return;
        }
        logger.error("Unable to add tuple to continuous query, queue is full");
    }

    private void init() throws BBoxDBException {
        try {
            TupleStoreManagerRegistry storageRegistry = this.clientConnectionHandler.getStorageRegistry();
            List<TupleStoreName> localTablesForRegion = SpacePartitionerCache.getInstance().getSpacePartitionerForGroupName(this.requestTable.getDistributionGroup()).getDistributionRegionIdMapper().getLocalTablesForRegion(this.boundingBox, this.requestTable);
            if (localTablesForRegion.size() != 1) {
                logger.error("Got more than one table for the continuous query {}", localTablesForRegion);
                close();
            } else {
                this.storageManager = QueryHelper.getTupleStoreManager(storageRegistry, localTablesForRegion.get(0));
                this.storageManager.registerInsertCallback(this.tupleInsertCallback);
                this.clientConnectionHandler.addConnectionClosedHandler(clientConnectionHandler -> {
                    close();
                });
            }
        } catch (ZookeeperException | StorageManagerException e) {
            logger.error("Got an exception during query init", e);
            close();
        }
    }

    @Override // org.bboxdb.network.server.ClientQuery
    public void fetchAndSendNextTuples(short s) throws IOException, PackageEncodeException {
        this.clientConnectionHandler.writeResultPackage(new MultipleTupleStartResponse(s));
        while (this.queryActive) {
            try {
                if (System.currentTimeMillis() >= this.lastFlushTime + FLUSH_TIME_IN_MS) {
                    this.clientConnectionHandler.writeResultPackage(new PageEndResponse(s));
                    this.clientConnectionHandler.flushPendingCompressionPackages();
                    this.lastFlushTime = System.currentTimeMillis();
                    return;
                }
                JoinedTuple take = this.tupleQueue.take();
                if (take == RED_PILL) {
                    logger.info("Got the red pill from the queue, cancel query");
                    this.clientConnectionHandler.writeResultPackage(new MultipleTupleEndResponse(s));
                    this.clientConnectionHandler.flushPendingCompressionPackages();
                    this.queryActive = false;
                    return;
                }
                if (take != KEEP_ALIVE_PILL) {
                    this.clientConnectionHandler.writeResultTuple(s, take, true);
                    this.totalSendTuples++;
                }
            } catch (InterruptedException e) {
                logger.info("Thread was interrupted while waiting for new tuples");
                this.queryActive = false;
                this.clientConnectionHandler.writeResultPackage(new ErrorResponse(s));
                this.clientConnectionHandler.flushPendingCompressionPackages();
                return;
            }
        }
        this.clientConnectionHandler.writeResultPackage(new MultipleTupleEndResponse(s));
        this.clientConnectionHandler.flushPendingCompressionPackages();
    }

    @Override // org.bboxdb.network.server.ClientQuery
    public void maintenanceCallback() {
        if (this.tupleQueue.isEmpty()) {
            this.tupleQueue.offer(KEEP_ALIVE_PILL);
        }
    }

    @Override // org.bboxdb.network.server.ClientQuery
    public boolean isQueryDone() {
        return !this.queryActive;
    }

    @Override // org.bboxdb.network.server.ClientQuery, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        logger.debug("Closing query {} (send {} result tuples)", Short.valueOf(this.querySequence), Long.valueOf(this.totalSendTuples));
        if (this.storageManager != null) {
            this.storageManager.removeInsertCallback(this.tupleInsertCallback);
        }
        this.queryActive = false;
        this.tupleQueue.offer(RED_PILL);
    }

    @Override // org.bboxdb.network.server.ClientQuery
    public long getTotalSendTuples() {
        return this.totalSendTuples;
    }
}
