/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.hadoop;

import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.batch.BatchLogObservers;
import cz.o2.proxima.direct.batch.BatchLogReader;
import cz.o2.proxima.direct.batch.ObserveHandle;
import cz.o2.proxima.direct.batch.TerminationContext;
import cz.o2.proxima.direct.bulk.Path;
import cz.o2.proxima.direct.bulk.Reader;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.hadoop.HadoopDataAccessor;
import cz.o2.proxima.direct.hadoop.HadoopFileSystem;
import cz.o2.proxima.direct.hadoop.HadoopPartition;
import cz.o2.proxima.direct.hadoop.HadoopPath;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HadoopBatchLogReader
implements BatchLogReader {
    private static final Logger log = LoggerFactory.getLogger(HadoopBatchLogReader.class);
    private final HadoopDataAccessor accessor;
    private final Context context;
    private final ExecutorService executor;

    public HadoopBatchLogReader(HadoopDataAccessor accessor, Context context) {
        this.accessor = accessor;
        this.context = context;
        this.executor = context.getExecutorService();
    }

    public List<Partition> getPartitions(long startStamp, long endStamp) {
        ArrayList<Partition> partitions = new ArrayList<Partition>();
        HadoopFileSystem fs = this.accessor.getHadoopFs();
        long batchProcessSize = this.accessor.getBatchProcessSize();
        Stream<Path> paths = fs.list(startStamp, endStamp);
        AtomicReference current = new AtomicReference();
        paths.filter(p -> !p.isTmpPath()).forEach(p -> {
            if (current.get() == null) {
                current.set(new HadoopPartition(partitions.size()));
                partitions.add((Partition)current.get());
            }
            HadoopPartition part = (HadoopPartition)((Object)((Object)current.get()));
            part.add((HadoopPath)p);
            if (part.size() > batchProcessSize) {
                current.set(null);
            }
        });
        return partitions;
    }

    public ObserveHandle observe(List<Partition> partitions, List<AttributeDescriptor<?>> attributes, BatchLogObserver observer) {
        TerminationContext terminationContext = new TerminationContext(observer);
        this.observeInternal(partitions, attributes, observer, terminationContext);
        return terminationContext.asObserveHandle();
    }

    private void observeInternal(List<Partition> partitions, List<AttributeDescriptor<?>> attributes, BatchLogObserver observer, TerminationContext terminationContext) {
        this.executor.submit(() -> {
            terminationContext.setRunningThread();
            try {
                block2: for (HadoopPartition p : partitions) {
                    for (HadoopPath path : p.getPaths()) {
                        if (this.processPath(observer, p.getMinTimestamp(), p, path, terminationContext)) continue;
                        break block2;
                    }
                }
                terminationContext.finished();
            }
            catch (Throwable ex) {
                terminationContext.handleErrorCaught(ex, () -> {
                    log.info("Restarting processing by request");
                    this.observeInternal(partitions, attributes, observer, terminationContext);
                });
            }
        });
    }

    public BatchLogReader.Factory<?> asFactory() {
        HadoopDataAccessor accessor = this.accessor;
        Context context = this.context;
        return (BatchLogReader.Factory & Serializable)repo -> new HadoopBatchLogReader(accessor, context);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private boolean processPath(BatchLogObserver observer, long watermark, HadoopPartition p, HadoopPath path, TerminationContext terminationContext) {
        try (Reader reader = this.accessor.getFormat().openReader((Path)path, this.accessor.getEntityDesc());){
            StreamElement elem;
            Iterator iterator = reader.iterator();
            do {
                if (!iterator.hasNext()) return true;
                elem = (StreamElement)iterator.next();
            } while (!terminationContext.isCancelled() && observer.onNext(elem, BatchLogObservers.withWatermark((Partition)p, (long)watermark)));
            boolean bl = false;
            return bl;
        }
        catch (IOException ex) {
            throw new RuntimeException("Failed to read file " + (Object)((Object)p), ex);
        }
    }

    public HadoopDataAccessor getAccessor() {
        return this.accessor;
    }
}

