/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.cassandra;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.batch.BatchLogObservers;
import cz.o2.proxima.direct.batch.BatchLogReader;
import cz.o2.proxima.direct.batch.ObserveHandle;
import cz.o2.proxima.direct.batch.TerminationContext;
import cz.o2.proxima.direct.cassandra.CassandraDBAccessor;
import cz.o2.proxima.direct.cassandra.CassandraPartition;
import cz.o2.proxima.functional.Factory;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import java.io.Serializable;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;

class CassandraLogReader
implements BatchLogReader {
    private final CassandraDBAccessor accessor;
    private final int parallelism;
    private final Factory<ExecutorService> executorFactory;
    private final ExecutorService executor;

    CassandraLogReader(CassandraDBAccessor accessor, Factory<ExecutorService> executorFactory) {
        this.accessor = accessor;
        this.parallelism = accessor.getBatchParallelism();
        this.executorFactory = executorFactory;
        this.executor = (ExecutorService)executorFactory.apply();
    }

    public List<Partition> getPartitions(long startStamp, long endStamp) {
        ArrayList<Partition> ret = new ArrayList<Partition>();
        double step = 1.8446744073709552E19 / (double)this.parallelism;
        double tokenStart = -9.223372036854776E18;
        double tokenEnd = tokenStart + step;
        for (int i = 0; i < this.parallelism; ++i) {
            ret.add((Partition)new CassandraPartition(i, startStamp, endStamp, (long)tokenStart, (long)tokenEnd, i == this.parallelism - 1));
            tokenStart = tokenEnd;
            tokenEnd += step;
            if (i != this.parallelism - 2) continue;
            tokenEnd = 9.223372036854776E18;
        }
        return ret;
    }

    public ObserveHandle observe(List<Partition> partitions, List<AttributeDescriptor<?>> attributes, BatchLogObserver observer) {
        TerminationContext terminationContext = new TerminationContext(observer);
        this.observeInternal(partitions, attributes, observer, terminationContext);
        return terminationContext.asObserveHandle();
    }

    private void observeInternal(List<Partition> partitions, List<AttributeDescriptor<?>> attributes, BatchLogObserver observer, TerminationContext terminationContext) {
        this.executor.submit(() -> {
            terminationContext.setRunningThread();
            try {
                Partition p;
                Iterator iterator = partitions.iterator();
                while (iterator.hasNext() && this.processSinglePartition((CassandraPartition)(p = (Partition)iterator.next()), attributes, terminationContext, observer)) {
                }
                terminationContext.finished();
            }
            catch (Throwable err) {
                terminationContext.handleErrorCaught(err, () -> this.observeInternal(partitions, attributes, observer, terminationContext));
            }
        });
    }

    private boolean processSinglePartition(CassandraPartition partition, List<AttributeDescriptor<?>> attributes, TerminationContext terminationContext, BatchLogObserver observer) {
        Session session = this.accessor.ensureSession();
        ResultSet result = this.accessor.execute(this.accessor.getCqlFactory().scanPartition(attributes, partition, session));
        AtomicLong position = new AtomicLong();
        for (Row row : result) {
            if (terminationContext.isCancelled()) {
                return false;
            }
            String key = row.getString(0);
            int field = 1;
            for (AttributeDescriptor<?> attribute : attributes) {
                ByteBuffer bytes;
                Object attributeName = attribute.getName();
                if (attribute.isWildcard()) {
                    String suffix = this.accessor.getConverter().asString(row.getObject(field++));
                    attributeName = attribute.toAttributePrefix() + suffix;
                }
                if ((bytes = row.getBytes(field++)) == null) continue;
                byte[] array = bytes.slice().array();
                if (observer.onNext(StreamElement.upsert((EntityDescriptor)this.accessor.getEntityDescriptor(), attribute, (String)("cql-" + this.accessor.getEntityDescriptor().getName() + "-part" + partition.getId() + position.incrementAndGet()), (String)key, (String)attributeName, (long)System.currentTimeMillis(), (byte[])array), BatchLogObservers.defaultContext((Partition)partition))) continue;
                return false;
            }
        }
        return true;
    }

    public URI getUri() {
        return this.accessor.getUri();
    }

    public BatchLogReader.Factory<?> asFactory() {
        CassandraDBAccessor accessor = this.accessor;
        Factory<ExecutorService> executorFactory = this.executorFactory;
        return (BatchLogReader.Factory & Serializable)repo -> new CassandraLogReader(accessor, executorFactory);
    }
}

