/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.blob;

import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.batch.BatchLogObservers;
import cz.o2.proxima.direct.batch.BatchLogReader;
import cz.o2.proxima.direct.batch.ObserveHandle;
import cz.o2.proxima.direct.batch.Offset;
import cz.o2.proxima.direct.batch.TerminationContext;
import cz.o2.proxima.direct.blob.BlobBase;
import cz.o2.proxima.direct.blob.BlobPath;
import cz.o2.proxima.direct.blob.BlobStorageAccessor;
import cz.o2.proxima.direct.bulk.FileFormat;
import cz.o2.proxima.direct.bulk.FileSystem;
import cz.o2.proxima.direct.bulk.NamingConvention;
import cz.o2.proxima.direct.bulk.Path;
import cz.o2.proxima.direct.bulk.Reader;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.internal.shaded.com.google.common.base.MoreObjects;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.Pair;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BlobLogReader<BlobT extends BlobBase, BlobPathT extends BlobPath<BlobT>>
implements BatchLogReader {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(BlobLogReader.class);
    private final EntityDescriptor entity;
    private final FileSystem fs;
    private final FileFormat fileFormat;
    private final NamingConvention namingConvention;
    private final long partitionMinSize;
    private final int partitionMaxNumBlobs;
    private final long partitionMaxTimeSpan;
    private final ExecutorService executor;
    private final BlobStorageAccessor accessor;
    private final Context context;

    protected BlobLogReader(BlobStorageAccessor accessor, Context context) {
        this.entity = accessor.getEntityDescriptor();
        this.fs = accessor.getTargetFileSystem();
        this.fileFormat = accessor.getFileFormat();
        this.namingConvention = accessor.getNamingConvention();
        this.partitionMinSize = accessor.getPartitionMinSize();
        this.partitionMaxNumBlobs = accessor.getPartitionMaxNumBlobs();
        this.partitionMaxTimeSpan = accessor.getPartitionMaxTimeSpanMs();
        this.executor = context.getExecutorService();
        this.context = context;
        this.accessor = accessor;
    }

    public List<Partition> getPartitions(long startStamp, long endStamp) {
        ArrayList<Partition> ret = new ArrayList<Partition>();
        AtomicInteger id = new AtomicInteger();
        BulkStoragePartition current = null;
        Stream<Path> paths = this.fs.list(startStamp, endStamp).sorted();
        Iterator it = paths.iterator();
        while (it.hasNext()) {
            current = this.considerBlobForPartitionInclusion(((BlobPath)it.next()).getBlob(), id, current, ret);
        }
        if (current != null) {
            ret.add(current);
        }
        return ret;
    }

    @Nullable
    private BulkStoragePartition<BlobT> considerBlobForPartitionInclusion(BlobT b, AtomicInteger partitionId, @Nullable BulkStoragePartition<BlobT> currentPartition, List<Partition> resultingPartitions) {
        log.trace("Considering blob {} for partition inclusion", (Object)b.getName());
        Pair<Long, Long> minMaxStamp = this.namingConvention.parseMinMaxTimestamp(b.getName());
        BulkStoragePartition<BlobT> res = currentPartition;
        if (this.partitionMaxTimeSpan > 0L && currentPartition != null && Math.max((Long)minMaxStamp.getSecond() - currentPartition.getMinTimestamp(), currentPartition.getMaxTimestamp() - (Long)minMaxStamp.getFirst()) > this.partitionMaxTimeSpan) {
            log.debug("Closing partition {} due to max time span {} reached", (Object)currentPartition, (Object)this.partitionMaxTimeSpan);
            resultingPartitions.add(currentPartition);
            res = null;
        }
        if (res == null) {
            res = new BulkStoragePartition(partitionId.getAndIncrement(), (Long)minMaxStamp.getFirst(), (Long)minMaxStamp.getSecond());
        }
        res.add(b, (Long)minMaxStamp.getFirst(), (Long)minMaxStamp.getSecond());
        log.trace("Blob {} added to partition {}", (Object)b.getName(), (Object)res);
        if (res.size() >= this.partitionMinSize || res.getNumBlobs() >= this.partitionMaxNumBlobs) {
            resultingPartitions.add(res);
            return null;
        }
        return res;
    }

    public ObserveHandle observe(List<Partition> partitions, List<AttributeDescriptor<?>> attributes, BatchLogObserver observer) {
        TerminationContext terminationContext = new TerminationContext(observer);
        this.observeInternal(partitions, attributes, observer, terminationContext);
        return terminationContext.asObserveHandle();
    }

    private void observeInternal(List<Partition> partitions, List<AttributeDescriptor<?>> attributes, BatchLogObserver observer, TerminationContext terminationContext) {
        Preconditions.checkArgument((partitions.stream().map(Partition::getId).distinct().count() == (long)partitions.size() ? 1 : 0) != 0, (String)"Passed partitions must be unique, got partitions %s", partitions);
        this.executor.submit(() -> {
            terminationContext.setRunningThread();
            try {
                HashSet attrs = new HashSet(attributes);
                AtomicBoolean stopProcessing = new AtomicBoolean(false);
                Iterator iterator = partitions.iterator();
                while (iterator.hasNext() && !terminationContext.isCancelled() && !stopProcessing.get()) {
                    Partition p = (Partition)iterator.next();
                    this.processSinglePartition(p, attrs, terminationContext, stopProcessing, observer);
                }
                terminationContext.finished();
            }
            catch (Throwable ex) {
                terminationContext.handleErrorCaught(ex, () -> {
                    log.info("Restarting processing by request");
                    this.observeInternal(partitions, attributes, observer, terminationContext);
                });
            }
        });
    }

    private void processSinglePartition(Partition partition, Set<AttributeDescriptor<?>> attrs, TerminationContext terminationContext, AtomicBoolean stopProcessing, BatchLogObserver observer) {
        BulkStoragePartition part = (BulkStoragePartition)partition;
        for (BlobBase blob : part.getBlobs()) {
            if (terminationContext.isCancelled() || stopProcessing.get()) break;
            try {
                this.runHandlingErrors(blob, () -> {
                    log.info("Starting to observe {} from partition {}", (Object)blob, (Object)partition);
                    try (Reader reader = this.fileFormat.openReader(this.createPath(blob), this.entity);){
                        long elementIndex = 0L;
                        Iterator iterator = reader.iterator();
                        while (!terminationContext.isCancelled() && !stopProcessing.get() && iterator.hasNext()) {
                            StreamElement element = (StreamElement)iterator.next();
                            Offset.SimpleOffset offset = Offset.of((Partition)partition, (long)elementIndex++, (!iterator.hasNext() ? 1 : 0) != 0);
                            if (!attrs.contains(element.getAttributeDescriptor())) continue;
                            boolean continueProcessing = observer.onNext(element, BatchLogObservers.withWatermarkSupplier((Partition)partition, (Offset)offset, () -> ((Partition)partition).getMinTimestamp()));
                            if (continueProcessing) continue;
                            stopProcessing.set(true);
                        }
                    }
                });
            }
            catch (Exception ex) {
                throw new IllegalStateException(String.format("Failed to read from %s", blob), ex);
            }
        }
    }

    protected abstract void runHandlingErrors(BlobT var1, ThrowingRunnable var2) throws Exception;

    protected abstract BlobPath<BlobT> createPath(BlobT var1);

    @Generated
    public BlobStorageAccessor getAccessor() {
        return this.accessor;
    }

    @Generated
    public Context getContext() {
        return this.context;
    }

    @FunctionalInterface
    public static interface ThrowingRunnable
    extends Serializable {
        public void run() throws Exception;
    }

    private static class BulkStoragePartition<BlobT extends BlobBase>
    implements Partition {
        private static final long serialVersionUID = 1L;
        private final List<BlobT> blobs = new ArrayList<BlobT>();
        private final int id;
        private long minStamp;
        private long maxStamp;
        private long size;

        BulkStoragePartition(int id, long minStamp, long maxStamp) {
            this.id = id;
            this.minStamp = minStamp;
            this.maxStamp = maxStamp;
        }

        void add(BlobT b, long minStamp, long maxStamp) {
            this.blobs.add(b);
            this.size += this.getSize(b);
            this.minStamp = Math.min(this.minStamp, minStamp);
            this.maxStamp = Math.max(this.maxStamp, maxStamp);
        }

        private long getSize(BlobT b) {
            return b.getSize();
        }

        public int getId() {
            return this.id;
        }

        public boolean isBounded() {
            return true;
        }

        public long size() {
            return this.size;
        }

        public int getNumBlobs() {
            return this.blobs.size();
        }

        public long getMinTimestamp() {
            return this.minStamp;
        }

        public long getMaxTimestamp() {
            return this.maxStamp;
        }

        public String toString() {
            return MoreObjects.toStringHelper(BulkStoragePartition.class).add("id", this.getId()).add("size", this.size()).add("minTimestamp", this.getMinTimestamp()).add("maxTimestamp", this.getMaxTimestamp()).add("blobs.size()", this.blobs.size()).toString();
        }

        @Generated
        public List<BlobT> getBlobs() {
            return this.blobs;
        }
    }
}

