/*
 * Decompiled with CFR 0.152.
 */
package org.bboxdb.tools.network;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.Socket;
import java.util.concurrent.atomic.AtomicLong;
import org.bboxdb.commons.math.GeoJsonPolygon;
import org.bboxdb.misc.BBoxDBException;
import org.bboxdb.misc.Const;
import org.bboxdb.network.client.BBoxDBCluster;
import org.bboxdb.network.client.future.client.EmptyResultFuture;
import org.bboxdb.network.client.future.client.OperationFuture;
import org.bboxdb.network.client.tools.FixedSizeFutureStore;
import org.bboxdb.storage.entity.Tuple;
import org.bboxdb.storage.entity.WatermarkTuple;
import org.bboxdb.tools.converter.tuple.TupleBuilder;
import org.bboxdb.tools.converter.tuple.TupleBuilderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CaptureADSB
implements Runnable {
    private static final int MAX_PENDING_FUTURES = 1000;
    private final String connectionPoint;
    private final String clustername;
    private final String tablename;
    private final FixedSizeFutureStore pendingFutures;
    private static final Logger logger = LoggerFactory.getLogger(CaptureADSB.class);

    public CaptureADSB(String connectionPoint, String clustername, String tablename) {
        this.connectionPoint = connectionPoint;
        this.clustername = clustername;
        this.tablename = tablename;
        this.pendingFutures = new FixedSizeFutureStore(1000L, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try (BBoxDBCluster bboxdbClient = new BBoxDBCluster(this.connectionPoint, this.clustername);
             Socket socket = new Socket("data.adsbhub.org", 5002);){
            bboxdbClient.connect();
            TupleBuilder builder = TupleBuilderFactory.getBuilderForFormat("adsb");
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), Const.DEFAULT_CHARSET));
            String line = null;
            long lastId = 0L;
            AtomicLong lastTimestmap = new AtomicLong();
            long readTuples = 0L;
            while (true) {
                if ((line = reader.readLine()) == null) break;
                if (Thread.currentThread().isInterrupted()) {
                    logger.info("Thread is interrupted, returning");
                    return;
                }
                try {
                    Tuple tuple = builder.buildTuple(line);
                    if (tuple != null) {
                        GeoJsonPolygon polygon = GeoJsonPolygon.fromGeoJson((String)new String(tuple.getDataBytes()));
                        if (lastId != 0L && lastId > polygon.getId()) {
                            WatermarkTuple watermarkTuple = new WatermarkTuple(lastTimestmap.get());
                            EmptyResultFuture insertFuture = bboxdbClient.put(this.tablename, (Tuple)watermarkTuple);
                            this.pendingFutures.put((OperationFuture)insertFuture);
                        }
                        lastId = polygon.getId();
                        lastTimestmap.set(tuple.getVersionTimestamp());
                        EmptyResultFuture insertFuture = bboxdbClient.put(this.tablename, tuple);
                        this.pendingFutures.put((OperationFuture)insertFuture);
                        ++readTuples;
                    }
                }
                catch (BBoxDBException e) {
                    logger.error("Got error while inserting tuple");
                }
                if (readTuples % 1000L != 0L) continue;
                System.out.print(".");
                System.out.flush();
            }
        }
        catch (Exception e) {
            logger.error("Got an exception", (Throwable)e);
        }
        finally {
            this.waitForPendingFutures();
        }
    }

    private void waitForPendingFutures() {
        try {
            this.pendingFutures.waitForCompletion();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 3) {
            System.err.println("Usage: <Class> <Connection Endpoint> <Clustername> <Table>");
            System.exit(-1);
        }
        String connectionPoint = args[0];
        String clustername = args[1];
        String tablename = args[2];
        CaptureADSB main = new CaptureADSB(connectionPoint, clustername, tablename);
        main.run();
    }
}

