/*
 * Decompiled with CFR 0.152.
 */
package org.bboxdb.tools.gui.views.query;

import java.awt.Color;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;
import org.bboxdb.misc.BBoxDBException;
import org.bboxdb.network.client.BBoxDBCluster;
import org.bboxdb.network.client.ContinuousQueryState;
import org.bboxdb.network.client.future.client.JoinedTupleListFuture;
import org.bboxdb.query.ContinuousQueryPlan;
import org.bboxdb.storage.entity.IdleQueryStateRemovedTuple;
import org.bboxdb.storage.entity.InvalidationTuple;
import org.bboxdb.storage.entity.MultiTuple;
import org.bboxdb.storage.entity.Tuple;
import org.bboxdb.storage.entity.TupleStoreName;
import org.bboxdb.storage.entity.WatermarkTuple;
import org.bboxdb.tools.gui.views.query.AbstractContinuousQueryRunable;
import org.bboxdb.tools.gui.views.query.ElementOverlayPainter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ContinuousQueryRunable
extends AbstractContinuousQueryRunable {
    private final List<Color> colors;
    private JoinedTupleListFuture queryResult;
    private final Set<Long> seenWatermarks;
    private final TransferQueue<MultiTuple> watermarkQueue;
    private static final Logger logger = LoggerFactory.getLogger(ContinuousQueryRunable.class);

    public ContinuousQueryRunable(List<Color> colors, ContinuousQueryPlan qp, BBoxDBCluster connection, ElementOverlayPainter painter) {
        super(qp, connection, painter);
        this.colors = colors;
        this.seenWatermarks = new HashSet<Long>();
        this.watermarkQueue = new LinkedTransferQueue<MultiTuple>();
    }

    protected void runThread() throws Exception {
        logger.info("New worker thread for a continuous query has started");
        this.queryResult = this.connection.queryContinuous(this.qp);
        try {
            this.queryResult.waitForCompletion();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        for (MultiTuple joinedTuple : this.queryResult) {
            if (this.queryResult.isFailed()) {
                logger.error("Got an error" + this.queryResult.getAllMessages());
                return;
            }
            if (Thread.currentThread().isInterrupted()) {
                return;
            }
            Tuple firstTuple = joinedTuple.getTuple(0);
            if (firstTuple instanceof WatermarkTuple) {
                this.handleWatermark(firstTuple);
                continue;
            }
            if (!this.qp.isReceiveWatermarks()) {
                if (firstTuple instanceof InvalidationTuple || firstTuple instanceof IdleQueryStateRemovedTuple) {
                    this.removeTupleFromView(joinedTuple);
                    continue;
                }
                this.updateTupleOnGui(joinedTuple, this.colors, true);
                this.removeStaleTupleIfNeeded();
                continue;
            }
            boolean queueResult = this.watermarkQueue.offer(joinedTuple);
            if (queueResult) continue;
            logger.warn("Unable to queue tuple {}", (Object)joinedTuple);
        }
    }

    private void handleWatermark(Tuple tuple) {
        logger.info("Handle watermark {}", (Object)tuple);
        String tupleStoreString = tuple.getKey().replace("WATERMARK_", "");
        TupleStoreName tupleStore = new TupleStoreName(tupleStoreString);
        long regionId = tupleStore.getRegionId().getAsLong();
        this.seenWatermarks.add(regionId);
        String queryUUID = this.qp.getQueryUUID();
        Optional queryStateOptional = this.connection.getContinousQueryState(queryUUID);
        if (!queryStateOptional.isPresent()) {
            logger.error("Query state is not present, unable to handle watermark");
            return;
        }
        ContinuousQueryState queryState = (ContinuousQueryState)queryStateOptional.get();
        if (this.seenWatermarks.size() == queryState.getRegisteredRegions().size()) {
            this.handleWatermarkDone();
        }
    }

    private void handleWatermarkDone() {
        logger.debug("Watermark complete: {}", this.seenWatermarks);
        LinkedList queue = new LinkedList();
        this.watermarkQueue.drainTo(queue);
        logger.info("Processing {} waiting elements", (Object)queue.size());
        for (MultiTuple multiTuple : queue) {
            Tuple firstTuple = multiTuple.getTuple(0);
            if (firstTuple instanceof InvalidationTuple) {
                this.removeTupleFromView(multiTuple);
                continue;
            }
            this.updateTupleOnGui(multiTuple, this.colors, false);
        }
        this.seenWatermarks.clear();
        this.removeStaleTupleIfNeeded();
        this.painter.repaintAll();
    }

    protected void endHook() {
        if (this.queryResult != null) {
            logger.info("Canceling continuous query");
            boolean interruptedFlag = Thread.interrupted();
            try {
                logger.info("Canceling continous query {}", (Object)this.qp.getQueryUUID());
                this.connection.cancelContinousQuery(this.qp.getQueryUUID());
            }
            catch (BBoxDBException e) {
                logger.error("Got exception", (Throwable)e);
            }
            catch (InterruptedException e) {
                logger.info("Got interrupted exception");
                interruptedFlag = true;
            }
            finally {
                if (interruptedFlag) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        logger.info("Worker for continuous query exited");
    }
}

