/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.bulk;

import cz.o2.proxima.direct.bulk.FileFormat;
import cz.o2.proxima.direct.bulk.FileSystem;
import cz.o2.proxima.direct.bulk.NamingConvention;
import cz.o2.proxima.direct.bulk.Path;
import cz.o2.proxima.direct.bulk.Writer;
import cz.o2.proxima.direct.core.AbstractBulkAttributeWriter;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.functional.Factory;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Iterables;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Streams;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.ExceptionUtils;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractBulkFileSystemAttributeWriter
extends AbstractBulkAttributeWriter {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractBulkFileSystemAttributeWriter.class);
    private static final long serialVersionUID = 2L;
    private final FileSystem fs;
    private final NamingConvention namingConvention;
    private final FileFormat format;
    private final long rollPeriodMs;
    private final long allowedLatenessMs;
    private final Factory<Executor> executorFactory;
    private final Context context;
    private final Map<Long, Bulk> writers = Collections.synchronizedMap(new HashMap());
    private final Map<String, LateBulk> lateWriters = Collections.synchronizedMap(new HashMap());
    private final AtomicInteger inFlightFlushes = new AtomicInteger();
    private long seqNo = 0L;
    @Nullable
    private transient Executor executor = null;

    protected AbstractBulkFileSystemAttributeWriter(EntityDescriptor entity, URI uri, FileSystem fs, NamingConvention namingConvention, FileFormat format, Context context, long rollPeriodMs, long allowedLatenessMs) {
        super(entity, uri);
        this.fs = fs;
        this.namingConvention = namingConvention;
        this.format = format;
        this.rollPeriodMs = rollPeriodMs;
        this.allowedLatenessMs = allowedLatenessMs;
        this.executorFactory = context.getExecutorFactory();
        this.context = context;
    }

    public void write(StreamElement data, long watermark, CommitCallback statusCallback) {
        long startStamp = data.getStamp() - data.getStamp() % this.rollPeriodMs;
        long maxStamp = Math.max(startStamp + this.rollPeriodMs, startStamp);
        if (maxStamp + this.allowedLatenessMs >= watermark) {
            Bulk bulk = this.writers.computeIfAbsent(startStamp, p -> (Bulk)ExceptionUtils.uncheckedFactory((ExceptionUtils.ThrowingFactory & Serializable)() -> new Bulk(this.fs.newPath(startStamp), startStamp, maxStamp)));
            this.writeToBulk(data, statusCallback, watermark, bulk);
        } else {
            this.handleLateData(data, watermark, statusCallback);
        }
        this.flushOnWatermark(watermark);
    }

    protected void handleLateData(StreamElement data, long watermark, CommitCallback statusCallback) {
        LateBulk lateBulk = this.lateWriters.computeIfAbsent((String)Iterables.getOnlyElement(this.namingConvention.prefixesOf(data.getStamp(), data.getStamp())), prefix -> this.newLateBulkFor(data));
        this.writeToBulk(data, statusCallback, watermark, lateBulk);
        log.debug("Written late element {} to {} on watermark {}", data, lateBulk.getWriter(), watermark);
    }

    private LateBulk newLateBulkFor(StreamElement data) {
        return (LateBulk)ExceptionUtils.uncheckedFactory((ExceptionUtils.ThrowingFactory & Serializable)() -> {
            log.debug("Created new late bulk for {}", (Object)data);
            return new LateBulk(this.fs.newPath(Long.MAX_VALUE));
        });
    }

    private void writeToBulk(StreamElement data, CommitCallback statusCallback, long watermark, Bulk bulk) {
        ExceptionUtils.unchecked((ExceptionUtils.ThrowingRunnable & Serializable)() -> bulk.write(data, statusCallback, watermark, this.seqNo++));
        log.debug("Written element {} to {} on watermark {}", data, bulk.getWriter(), watermark);
    }

    public void updateWatermark(long watermark) {
        this.flushOnWatermark(watermark);
    }

    public void rollback() {
        this.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        Map<Long, Bulk> map = this.writers;
        synchronized (map) {
            Map<String, LateBulk> map2 = this.lateWriters;
            synchronized (map2) {
                Streams.concat((Stream[])new Stream[]{this.writers.values().stream(), this.lateWriters.values().stream()}).forEach(writer -> {
                    Path path = writer.getWriter().getPath();
                    try {
                        writer.getWriter().close();
                        path.delete();
                    }
                    catch (IOException ex) {
                        throw new RuntimeException(ex);
                    }
                });
                this.lateWriters.clear();
            }
            this.writers.clear();
        }
    }

    protected abstract void flush(Bulk var1);

    private void flushOnWatermark(long watermark) {
        Collection<Bulk> flushable = this.collectFlushable(watermark);
        if (!flushable.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("Collected {} flushable bulks at watermark {} with allowedLatenessMs {}.", flushable, watermark, this.allowedLatenessMs);
            }
            CommitCallback commit = flushable.stream().max(Comparator.comparing(Bulk::getLastWriteSeqNo)).get().getCommit();
            try {
                AtomicInteger toFlush = new AtomicInteger(flushable.size());
                flushable.forEach(bulk -> {
                    ExceptionUtils.unchecked((ExceptionUtils.ThrowingRunnable & Serializable)() -> bulk.getWriter().close());
                    this.executor().execute(() -> {
                        try {
                            this.flush((Bulk)bulk);
                            log.info("Flushed path {}", (Object)bulk.getPath());
                            if (toFlush.decrementAndGet() == 0) {
                                commit.commit(true, null);
                            }
                            this.inFlightFlushes.decrementAndGet();
                        }
                        catch (Exception ex) {
                            this.inFlightFlushes.decrementAndGet();
                            log.error("Failed to flush path {}", (Object)bulk.getPath(), (Object)ex);
                            toFlush.set(-1);
                            ExceptionUtils.unchecked(bulk.getPath()::delete);
                            commit.commit(false, (Throwable)ex);
                        }
                    });
                    this.writers.remove(bulk.getStartTs());
                });
            }
            catch (Exception ex) {
                log.error("Failed to flush paths {}", (Object)flushable, (Object)ex);
                commit.commit(false, (Throwable)ex);
            }
        }
    }

    private Executor executor() {
        if (this.executor == null) {
            this.executor = (Executor)this.executorFactory.apply();
        }
        return this.executor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Collection<Bulk> collectFlushable(long watermark) {
        if (this.inFlightFlushes.get() > 0) {
            return Collections.emptyList();
        }
        Map<Long, Bulk> map = this.writers;
        synchronized (map) {
            Set<Bulk> ret = this.writers.values().stream().filter(bulk -> bulk.getMaxTs() + this.allowedLatenessMs < watermark).collect(Collectors.toSet());
            long maxWriteSeqNo = this.getMaxWriteSeqNo(ret);
            if (ret.isEmpty()) {
                return ret;
            }
            Map<String, LateBulk> map2 = this.lateWriters;
            synchronized (map2) {
                int initialSize;
                do {
                    initialSize = ret.size();
                    List lateBulks = this.getAdditionalBulks(this.lateWriters.entrySet(), maxWriteSeqNo);
                    lateBulks.forEach(e -> ret.add(this.lateWriters.remove(e.getKey())));
                    maxWriteSeqNo = this.getMaxWriteSeqNo(ret);
                    List normalBulks = this.getAdditionalBulks(this.writers.entrySet(), maxWriteSeqNo);
                    normalBulks.forEach(e -> ret.add((Bulk)e.getValue()));
                    maxWriteSeqNo = this.getMaxWriteSeqNo(ret);
                } while (initialSize < ret.size());
            }
            this.inFlightFlushes.addAndGet(ret.size());
            return ret;
        }
    }

    private long getMaxWriteSeqNo(Set<Bulk> ret) {
        return ret.stream().mapToLong(Bulk::getLastWriteSeqNo).max().orElse(Long.MIN_VALUE);
    }

    private <K, B extends Bulk> List<Map.Entry<K, B>> getAdditionalBulks(Collection<Map.Entry<K, B>> bulks, long maxWriteSeqNo) {
        return bulks.stream().filter(e -> ((Bulk)e.getValue()).getFirstWriteSeqNo() < maxWriteSeqNo).collect(Collectors.toList());
    }

    @Generated
    public FileSystem getFs() {
        return this.fs;
    }

    @Generated
    public NamingConvention getNamingConvention() {
        return this.namingConvention;
    }

    @Generated
    public FileFormat getFormat() {
        return this.format;
    }

    @Generated
    public long getRollPeriodMs() {
        return this.rollPeriodMs;
    }

    @Generated
    public long getAllowedLatenessMs() {
        return this.allowedLatenessMs;
    }

    @Generated
    public Factory<Executor> getExecutorFactory() {
        return this.executorFactory;
    }

    @Generated
    public Context getContext() {
        return this.context;
    }

    private class LateBulk
    extends Bulk {
        private long startTs;
        private long maxSeenTs;

        LateBulk(Path path) throws IOException {
            super(path, Long.MIN_VALUE, Long.MAX_VALUE);
            this.startTs = Long.MAX_VALUE;
            this.maxSeenTs = Long.MIN_VALUE;
        }

        @Override
        synchronized void write(StreamElement data, CommitCallback commit, long watermark, long seqNo) throws IOException {
            super.write(data, commit, watermark, seqNo);
            this.maxSeenTs = Math.max(data.getStamp(), this.maxSeenTs);
            this.startTs = Math.min(data.getStamp(), this.startTs);
        }

        @Override
        public long getStartTs() {
            return this.startTs;
        }

        @Override
        public long getMaxTs() {
            return this.maxSeenTs;
        }

        @Override
        @Generated
        public String toString() {
            return "AbstractBulkFileSystemAttributeWriter.LateBulk(startTs=" + this.getStartTs() + ", maxSeenTs=" + this.maxSeenTs + ")";
        }
    }

    protected class Bulk {
        private final Path path;
        private final Writer writer;
        private final long startTs;
        private final long maxTs;
        @Nullable
        private CommitCallback commit = null;
        private long lastWriteWatermark = Long.MIN_VALUE;
        private long firstWriteSeqNo = -1L;
        private long lastWriteSeqNo = 0L;

        Bulk(Path path, long startTs, long maxTs) throws IOException {
            this.path = path;
            this.writer = AbstractBulkFileSystemAttributeWriter.this.format.openWriter(path, AbstractBulkFileSystemAttributeWriter.this.getEntityDescriptor());
            this.startTs = startTs;
            this.maxTs = maxTs;
        }

        synchronized void write(StreamElement data, CommitCallback commit, long watermark, long seqNo) throws IOException {
            this.commit = commit;
            this.lastWriteWatermark = watermark;
            this.lastWriteSeqNo = seqNo;
            if (this.firstWriteSeqNo < 0L) {
                this.firstWriteSeqNo = seqNo;
            }
            this.writer.write(data);
        }

        public CommitCallback getCommit() {
            return Objects.requireNonNull(this.commit);
        }

        @Generated
        public String toString() {
            return "AbstractBulkFileSystemAttributeWriter.Bulk(path=" + this.getPath() + ", writer=" + this.getWriter() + ", startTs=" + this.getStartTs() + ", maxTs=" + this.getMaxTs() + ", commit=" + this.getCommit() + ", lastWriteWatermark=" + this.getLastWriteWatermark() + ", firstWriteSeqNo=" + this.getFirstWriteSeqNo() + ", lastWriteSeqNo=" + this.getLastWriteSeqNo() + ")";
        }

        @Generated
        public Path getPath() {
            return this.path;
        }

        @Generated
        public Writer getWriter() {
            return this.writer;
        }

        @Generated
        public long getStartTs() {
            return this.startTs;
        }

        @Generated
        public long getMaxTs() {
            return this.maxTs;
        }

        @Generated
        public long getLastWriteWatermark() {
            return this.lastWriteWatermark;
        }

        @Generated
        public long getFirstWriteSeqNo() {
            return this.firstWriteSeqNo;
        }

        @Generated
        public long getLastWriteSeqNo() {
            return this.lastWriteSeqNo;
        }
    }
}

