/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.beam.direct.io;

import cz.o2.proxima.beam.core.io.StreamElementCoder;
import cz.o2.proxima.direct.batch.BatchLogObservable;
import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.core.Partition;
import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Lists;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.storage.StreamElement;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DirectBatchUnboundedSource
extends UnboundedSource<StreamElement, Checkpoint> {
    private static final Logger log = LoggerFactory.getLogger(DirectBatchUnboundedSource.class);
    private static final long serialVersionUID = 1L;
    private final RepositoryFactory factory;
    private final BatchLogObservable reader;
    private final List<AttributeDescriptor<?>> attributes;
    private final List<Partition> partitions;
    private final long startStamp;
    private final long endStamp;

    public static DirectBatchUnboundedSource of(RepositoryFactory factory, BatchLogObservable reader, List<AttributeDescriptor<?>> attrs, long startStamp, long endStamp) {
        return new DirectBatchUnboundedSource(factory, reader, attrs, startStamp, endStamp);
    }

    private DirectBatchUnboundedSource(RepositoryFactory factory, BatchLogObservable reader, List<AttributeDescriptor<?>> attributes, long startStamp, long endStamp) {
        this.factory = factory;
        this.reader = reader;
        this.attributes = Collections.unmodifiableList(attributes);
        this.partitions = Collections.emptyList();
        this.startStamp = startStamp;
        this.endStamp = endStamp;
    }

    private DirectBatchUnboundedSource(DirectBatchUnboundedSource parent, List<Partition> partitions, long startStamp, long endStamp) {
        this.factory = parent.factory;
        this.reader = parent.reader;
        this.attributes = parent.attributes;
        this.startStamp = startStamp;
        this.endStamp = endStamp;
        ArrayList parts = Lists.newArrayList(partitions);
        parts.sort(DirectBatchUnboundedSource.partitionsComparator());
        this.partitions = Collections.unmodifiableList(parts);
        if (log.isDebugEnabled()) {
            log.debug("Created source with partition min timestamps {}", parts.stream().map(Partition::getMinTimestamp).collect(Collectors.toList()));
        }
    }

    public List<? extends UnboundedSource<StreamElement, Checkpoint>> split(int desiredNumSplits, PipelineOptions options) throws Exception {
        if (this.partitions.isEmpty()) {
            List parts = this.reader.getPartitions(this.startStamp, this.endStamp);
            ArrayList splits = new ArrayList();
            int current = 0;
            for (Partition p : parts) {
                if (splits.size() <= current) {
                    splits.add(new ArrayList());
                }
                ((List)splits.get(current)).add(p);
                current = (current + 1) % desiredNumSplits;
            }
            return splits.stream().map(s -> new DirectBatchUnboundedSource(this, (List<Partition>)s, this.startStamp, this.endStamp)).collect(Collectors.toList());
        }
        return Arrays.asList(this);
    }

    public UnboundedSource.UnboundedReader<StreamElement> createReader(PipelineOptions options, Checkpoint checkpointMark) throws IOException {
        List<Partition> toProcess = Collections.synchronizedList(new ArrayList(checkpointMark == null ? this.partitions : checkpointMark.partitions));
        return new StreamElementUnboundedReader(this, this.reader, this.attributes, checkpointMark, toProcess);
    }

    public Coder<Checkpoint> getCheckpointMarkCoder() {
        return new CheckpointCoder();
    }

    public Coder<StreamElement> getOutputCoder() {
        return StreamElementCoder.of(this.factory);
    }

    @VisibleForTesting
    static Comparator<Partition> partitionsComparator() {
        return (p1, p2) -> {
            int cmp = Long.compare(p1.getMinTimestamp(), p2.getMinTimestamp());
            if (cmp == 0) {
                return Long.compare(p1.getMaxTimestamp(), p2.getMaxTimestamp());
            }
            return cmp;
        };
    }

    private static BatchLogObserver asObserver(final BlockingQueue<StreamElement> queue, final AtomicBoolean running, final AtomicBoolean stopped) {
        return new BatchLogObserver(){

            public boolean onNext(StreamElement element) {
                try {
                    while (!stopped.get()) {
                        if (!queue.offer(element, 100L, TimeUnit.MILLISECONDS)) continue;
                        return true;
                    }
                }
                catch (InterruptedException ex) {
                    log.warn("Interrupted while reading data.", (Throwable)ex);
                    Thread.currentThread().interrupt();
                }
                return false;
            }

            public void onCompleted() {
                running.set(false);
            }

            public boolean onError(Throwable error) {
                throw new RuntimeException(error);
            }
        };
    }

    private static class StreamElementUnboundedReader
    extends UnboundedSource.UnboundedReader<StreamElement> {
        private final UnboundedSource<StreamElement, ?> source;
        private final BatchLogObservable reader;
        private final List<AttributeDescriptor<?>> attributes;
        private final List<Partition> toProcess;
        private final BlockingQueue<StreamElement> queue = new ArrayBlockingQueue<StreamElement>(100);
        private final AtomicBoolean running = new AtomicBoolean();
        private final AtomicBoolean stopped = new AtomicBoolean();
        long consumedFromCurrent;
        @Nullable
        StreamElement current = null;
        long skip;
        Instant watermark;
        @Nullable
        Partition runningPartition = null;

        public StreamElementUnboundedReader(UnboundedSource<StreamElement, ?> source, BatchLogObservable reader, List<AttributeDescriptor<?>> attributes, @Nullable Checkpoint checkpointMark, List<Partition> toProcess) {
            this.source = Objects.requireNonNull(source);
            this.reader = Objects.requireNonNull(reader);
            this.attributes = new ArrayList((Collection)Objects.requireNonNull(attributes));
            this.toProcess = new ArrayList<Partition>((Collection)Objects.requireNonNull(toProcess));
            this.consumedFromCurrent = 0L;
            this.skip = checkpointMark == null ? 0L : checkpointMark.skipFromFirst;
        }

        public boolean start() {
            return this.advance();
        }

        public boolean advance() {
            do {
                if (this.queue.isEmpty() && !this.running.get()) {
                    if (this.runningPartition != null) {
                        this.toProcess.remove(0);
                        this.runningPartition = null;
                    }
                    if (!this.toProcess.isEmpty()) {
                        this.runningPartition = this.toProcess.get(0);
                        this.reader.observe(Arrays.asList(this.runningPartition), this.attributes, DirectBatchUnboundedSource.asObserver(this.queue, this.running, this.stopped));
                        this.running.set(true);
                        this.watermark = new Instant(this.runningPartition.getMinTimestamp());
                        this.consumedFromCurrent = 0L;
                    } else {
                        this.watermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
                        return false;
                    }
                }
                this.current = (StreamElement)this.queue.poll();
                if (this.current == null) {
                    return false;
                }
                ++this.consumedFromCurrent;
            } while (this.skip-- > 0L);
            return this.current != null;
        }

        public Instant getWatermark() {
            return this.watermark;
        }

        public UnboundedSource.CheckpointMark getCheckpointMark() {
            return new Checkpoint(this.toProcess, this.consumedFromCurrent);
        }

        public UnboundedSource<StreamElement, ?> getCurrentSource() {
            return this.source;
        }

        public StreamElement getCurrent() throws NoSuchElementException {
            return this.current;
        }

        public Instant getCurrentTimestamp() throws NoSuchElementException {
            if (this.current == null) {
                return BoundedWindow.TIMESTAMP_MIN_VALUE;
            }
            return new Instant(this.current.getStamp());
        }

        public void close() {
            this.stopped.set(true);
        }
    }

    public static class CheckpointCoder
    extends Coder<Checkpoint> {
        private static final long serialVersionUID = 1L;

        public void encode(Checkpoint value, OutputStream outStream) throws IOException {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            GZIPOutputStream gzout = new GZIPOutputStream(baos);
            ObjectOutputStream oos = new ObjectOutputStream(gzout);
            oos.writeObject(value);
            oos.flush();
            gzout.finish();
            byte[] bytes = baos.toByteArray();
            DataOutputStream dos = new DataOutputStream(outStream);
            dos.writeInt(bytes.length);
            dos.write(bytes);
        }

        public Checkpoint decode(InputStream inStream) throws CoderException, IOException {
            DataInputStream dis = new DataInputStream(inStream);
            int length = dis.readInt();
            byte[] bytes = new byte[length];
            dis.readFully(bytes);
            GZIPInputStream gzin = new GZIPInputStream(new ByteArrayInputStream(bytes));
            ObjectInputStream ois = new ObjectInputStream(gzin);
            try {
                return (Checkpoint)ois.readObject();
            }
            catch (ClassNotFoundException ex) {
                throw new CoderException((Throwable)ex);
            }
        }

        public List<? extends Coder<?>> getCoderArguments() {
            return Collections.emptyList();
        }

        public void verifyDeterministic() {
        }
    }

    public static class Checkpoint
    implements UnboundedSource.CheckpointMark,
    Serializable {
        private static final long serialVersionUID = 1L;
        private final List<Partition> partitions;
        private final long skipFromFirst;

        Checkpoint(List<Partition> partitions, long skipFromFirst) {
            this.partitions = Lists.newArrayList(partitions);
            this.skipFromFirst = skipFromFirst;
        }

        public void finalizeCheckpoint() {
        }

        public String toString() {
            return "DirectBatchUnboundedSource.Checkpoint(partitions=" + this.getPartitions() + ", skipFromFirst=" + this.getSkipFromFirst() + ")";
        }

        public List<Partition> getPartitions() {
            return this.partitions;
        }

        public long getSkipFromFirst() {
            return this.skipFromFirst;
        }
    }
}

