package org.bboxdb.network.server;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import org.bboxdb.commons.math.Hyperrectangle;
import org.bboxdb.distribution.partitioner.SpacePartitionerCache;
import org.bboxdb.distribution.zookeeper.ZookeeperException;
import org.bboxdb.misc.BBoxDBException;
import org.bboxdb.network.packages.PackageEncodeException;
import org.bboxdb.network.packages.response.MultipleTupleEndResponse;
import org.bboxdb.network.packages.response.MultipleTupleStartResponse;
import org.bboxdb.network.packages.response.PageEndResponse;
import org.bboxdb.network.server.connection.ClientConnectionHandler;
import org.bboxdb.storage.StorageManagerException;
import org.bboxdb.storage.entity.JoinedTuple;
import org.bboxdb.storage.entity.Tuple;
import org.bboxdb.storage.entity.TupleStoreName;
import org.bboxdb.storage.tuplestore.manager.TupleStoreManager;
import org.bboxdb.storage.tuplestore.manager.TupleStoreManagerRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/bboxdb/network/server/ContinuousBoundingBoxClientQuery.class */
public class ContinuousBoundingBoxClientQuery implements ClientQuery {
    protected final Hyperrectangle boundingBox;
    protected final ClientConnectionHandler clientConnectionHandler;
    protected final short querySequence;
    protected final TupleStoreName requestTable;
    protected boolean queryActive;
    protected static final int MAX_QUEUE_CAPACITY = 1024;
    protected final Consumer<Tuple> tupleInsertCallback;
    protected TupleStoreManager storageManager;
    private static final Logger logger = LoggerFactory.getLogger(ContinuousBoundingBoxClientQuery.class);
    protected final long tuplesPerPage = 1;
    protected final BlockingQueue<Tuple> tupleQueue = new ArrayBlockingQueue(MAX_QUEUE_CAPACITY);
    protected long totalSendTuples = 0;

    public ContinuousBoundingBoxClientQuery(Hyperrectangle hyperrectangle, ClientConnectionHandler clientConnectionHandler, short s, TupleStoreName tupleStoreName) {
        this.queryActive = true;
        this.boundingBox = hyperrectangle;
        this.clientConnectionHandler = clientConnectionHandler;
        this.querySequence = s;
        this.requestTable = tupleStoreName;
        this.tupleInsertCallback = tuple -> {
            if (tuple.getBoundingBox().intersects(hyperrectangle) && !this.tupleQueue.offer(tuple)) {
                logger.error("Unable to add tuple to continuous query, queue is full");
            }
        };
        try {
            init();
        } catch (BBoxDBException e) {
            logger.error("Got exception on init", e);
            this.queryActive = false;
        }
    }

    protected void init() throws BBoxDBException {
        try {
            TupleStoreManagerRegistry storageRegistry = this.clientConnectionHandler.getStorageRegistry();
            List<TupleStoreName> localTablesForRegion = SpacePartitionerCache.getInstance().getSpacePartitionerForGroupName(this.requestTable.getDistributionGroup()).getDistributionRegionIdMapper().getLocalTablesForRegion(this.boundingBox, this.requestTable);
            if (localTablesForRegion.size() != 1) {
                logger.error("Got more than one table for the continuous query {}", localTablesForRegion);
                close();
            } else {
                this.storageManager = QueryHelper.getTupleStoreManager(storageRegistry, localTablesForRegion.get(0));
                this.storageManager.registerInsertCallback(this.tupleInsertCallback);
                this.clientConnectionHandler.addConnectionClosedHandler(clientConnectionHandler -> {
                    close();
                });
            }
        } catch (ZookeeperException | StorageManagerException e) {
            logger.error("Got an exception during query init", e);
            close();
        }
    }

    @Override // org.bboxdb.network.server.ClientQuery
    public void fetchAndSendNextTuples(short s) throws IOException, PackageEncodeException {
        try {
            long j = 0;
            this.clientConnectionHandler.writeResultPackage(new MultipleTupleStartResponse(s));
            while (this.queryActive) {
                if (j >= 1) {
                    this.clientConnectionHandler.writeResultPackage(new PageEndResponse(s));
                    this.clientConnectionHandler.flushPendingCompressionPackages();
                    return;
                } else {
                    this.clientConnectionHandler.writeResultTuple(s, new JoinedTuple(this.tupleQueue.take(), this.requestTable.getFullname()));
                    this.totalSendTuples++;
                    j++;
                }
            }
            this.clientConnectionHandler.writeResultPackage(new MultipleTupleEndResponse(s));
            this.clientConnectionHandler.flushPendingCompressionPackages();
        } catch (InterruptedException e) {
            logger.debug("Got interrupted excetion");
            close();
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.bboxdb.network.server.ClientQuery
    public boolean isQueryDone() {
        return !this.queryActive;
    }

    @Override // org.bboxdb.network.server.ClientQuery, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        logger.debug("Closing query {} (send {} result tuples)", Short.valueOf(this.querySequence), Long.valueOf(this.totalSendTuples));
        if (this.storageManager != null) {
            this.storageManager.removeInsertCallback(this.tupleInsertCallback);
        }
        this.queryActive = false;
    }

    @Override // org.bboxdb.network.server.ClientQuery
    public long getTotalSendTuples() {
        return this.totalSendTuples;
    }
}
