/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.blob;

import cz.o2.proxima.direct.batch.BatchLogObservable;
import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.blob.BlobBase;
import cz.o2.proxima.direct.blob.BlobPath;
import cz.o2.proxima.direct.blob.BlobStorageAccessor;
import cz.o2.proxima.direct.bulk.FileFormat;
import cz.o2.proxima.direct.bulk.FileSystem;
import cz.o2.proxima.direct.bulk.NamingConvention;
import cz.o2.proxima.direct.bulk.Path;
import cz.o2.proxima.direct.bulk.Reader;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.Partition;
import cz.o2.proxima.functional.Factory;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.util.Pair;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BlobLogObservable<BlobT extends BlobBase, BlobPathT extends BlobPath<BlobT>>
implements BatchLogObservable {
    private static final Logger log = LoggerFactory.getLogger(BlobLogObservable.class);
    private static final long serialVersionUID = 1L;
    private final EntityDescriptor entity;
    private final FileSystem fs;
    private final FileFormat fileFormat;
    private final NamingConvention namingConvention;
    private final long partitionMinSize;
    private final int partitionMaxNumBlobs;
    private final Factory<Executor> executorFactory;
    @Nullable
    private transient Executor executor = null;

    public BlobLogObservable(BlobStorageAccessor accessor, Context context) {
        this.entity = accessor.getEntityDescriptor();
        this.fs = accessor.getTargetFileSystem();
        this.fileFormat = accessor.getFileFormat();
        this.namingConvention = accessor.getNamingConvention();
        this.partitionMinSize = accessor.getPartitionMinSize();
        this.partitionMaxNumBlobs = accessor.getPartitionMaxNumBlobs();
        this.executorFactory = () -> ((Context)context).getExecutorService();
    }

    public List<Partition> getPartitions(long startStamp, long endStamp) {
        ArrayList<Partition> ret = new ArrayList<Partition>();
        AtomicInteger id = new AtomicInteger();
        AtomicReference current = new AtomicReference();
        Stream<Path> paths = this.fs.list(startStamp, endStamp);
        paths.forEach(blob -> this.considerBlobForPartitionInclusion(((BlobPath)blob).getBlob(), id, current, ret));
        if (current.get() != null) {
            ret.add((Partition)current.get());
        }
        return ret;
    }

    private void considerBlobForPartitionInclusion(BlobT b, AtomicInteger partitionId, AtomicReference<BulkStoragePartition<BlobT>> currentPartition, List<Partition> resultingPartitions) {
        log.trace("Considering blob {} for partition inclusion", (Object)b.getName());
        Pair<Long, Long> minMaxStamp = this.namingConvention.parseMinMaxTimestamp(b.getName());
        if (currentPartition.get() == null) {
            currentPartition.set(new BulkStoragePartition(partitionId.getAndIncrement(), (Long)minMaxStamp.getFirst(), (Long)minMaxStamp.getSecond()));
        }
        currentPartition.get().add(b, (Long)minMaxStamp.getFirst(), (Long)minMaxStamp.getSecond());
        log.trace("Blob {} added to partition {}", (Object)b.getName(), (Object)currentPartition.get());
        if (currentPartition.get().size() >= this.partitionMinSize || currentPartition.get().getNumBlobs() >= this.partitionMaxNumBlobs) {
            resultingPartitions.add(currentPartition.getAndSet(null));
        }
    }

    public void observe(List<Partition> partitions, List<AttributeDescriptor<?>> attributes, BatchLogObserver observer) {
        this.executor().execute(() -> {
            block2: {
                try {
                    HashSet attrs = new HashSet(attributes);
                    partitions.forEach(p -> {
                        BulkStoragePartition part = (BulkStoragePartition)p;
                        part.getBlobs().forEach(blob -> {
                            try {
                                this.runHandlingErrors(blob, () -> {
                                    log.info("Starting to observe partition {}", p);
                                    try (Reader reader = this.fileFormat.openReader(this.createPath(blob), this.entity);){
                                        reader.forEach(e -> {
                                            if (attrs.contains(e.getAttributeDescriptor())) {
                                                observer.onNext(e, p);
                                            }
                                        });
                                    }
                                });
                            }
                            catch (Exception ex) {
                                throw new IllegalStateException(String.format("Failed to read from %s", blob), ex);
                            }
                        });
                    });
                    observer.onCompleted();
                }
                catch (Exception ex) {
                    log.error("Failed to observe partitions {}", (Object)partitions, (Object)ex);
                    if (!observer.onError((Throwable)ex)) break block2;
                    log.info("Restarting processing by request");
                    this.observe(partitions, attributes, observer);
                }
            }
        });
    }

    protected abstract void runHandlingErrors(BlobT var1, ThrowingRunnable var2) throws Exception;

    protected abstract BlobPath<BlobT> createPath(BlobT var1);

    private Executor executor() {
        if (this.executor == null) {
            this.executor = (Executor)this.executorFactory.apply();
        }
        return this.executor;
    }

    @FunctionalInterface
    public static interface ThrowingRunnable
    extends Serializable {
        public void run() throws Exception;
    }

    private static class BulkStoragePartition<BlobT extends BlobBase>
    implements Partition {
        private static final long serialVersionUID = 1L;
        private final List<BlobT> blobs = new ArrayList<BlobT>();
        private final int id;
        private long minStamp;
        private long maxStamp;
        private long size;

        BulkStoragePartition(int id, long minStamp, long maxStamp) {
            this.id = id;
            this.minStamp = minStamp;
            this.maxStamp = maxStamp;
        }

        void add(BlobT b, long minStamp, long maxStamp) {
            this.blobs.add(b);
            this.size += this.getSize(b);
            this.minStamp = Math.min(this.minStamp, minStamp);
            this.maxStamp = Math.max(this.maxStamp, maxStamp);
        }

        private long getSize(BlobT b) {
            return b.getSize();
        }

        public int getId() {
            return this.id;
        }

        public boolean isBounded() {
            return true;
        }

        public long size() {
            return this.size;
        }

        public int getNumBlobs() {
            return this.blobs.size();
        }

        public long getMinTimestamp() {
            return this.minStamp;
        }

        public long getMaxTimestamp() {
            return this.maxStamp;
        }

        public List<BlobT> getBlobs() {
            return this.blobs;
        }
    }
}

