/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.beam.direct.io;

import cz.o2.proxima.beam.direct.io.BlockingQueueLogObserver;
import cz.o2.proxima.beam.direct.io.DirectUnboundedSource;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.commitlog.Offset;
import cz.o2.proxima.direct.core.Partition;
import cz.o2.proxima.internal.shaded.com.google.common.base.MoreObjects;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BeamCommitLogReader {
    private static final Logger log = LoggerFactory.getLogger(BeamCommitLogReader.class);
    private static final Instant LOWEST_INSTANT = BoundedWindow.TIMESTAMP_MIN_VALUE;
    private static final Instant HIGHEST_INSTANT = BoundedWindow.TIMESTAMP_MAX_VALUE;
    private static final byte[] EMPTY_BYTES = new byte[0];
    private static final long AUTO_WATERMARK_LAG_MS = 500L;
    private final Partition partition;
    private ObserveHandle handle;
    @Nullable
    private final String name;
    private final CommitLogReader reader;
    private final Position position;
    private final boolean eventTime;
    private final boolean stopAtCurrent;
    private boolean finished;
    private long limit;
    @Nullable
    private final Offset offset;
    private final long offsetWatermark;
    @Nullable
    private BlockingQueueLogObserver observer;
    private StreamElement current;
    private Instant currentProcessingTime = Instant.now();

    static BoundedSource.BoundedReader<StreamElement> bounded(final BoundedSource<StreamElement> source, String name, CommitLogReader reader, Position position, long limit, Partition partition) {
        final BeamCommitLogReader r = new BeamCommitLogReader(name, reader, position, true, partition, null, limit, true);
        return new BoundedSource.BoundedReader<StreamElement>(){

            public BoundedSource<StreamElement> getCurrentSource() {
                return source;
            }

            public boolean start() throws IOException {
                return r.start();
            }

            public boolean advance() throws IOException {
                return r.advance();
            }

            public StreamElement getCurrent() throws NoSuchElementException {
                return r.getCurrent();
            }

            public void close() {
                r.close();
            }

            public Instant getCurrentTimestamp() throws NoSuchElementException {
                return r.getCurrentTimestamp();
            }
        };
    }

    static UnboundedCommitLogReader unbounded(DirectUnboundedSource source, String name, CommitLogReader reader, Position position, boolean eventTime, long limit, @Nullable Partition partition, @Nullable Offset offset) {
        return new UnboundedCommitLogReader(name, source, reader, position, eventTime, limit, partition, offset);
    }

    private BeamCommitLogReader(@Nullable String name, CommitLogReader reader, Position position, boolean eventTime, @Nullable Partition partition, @Nullable Offset offset, long limit, boolean stopAtCurrent) {
        this.name = name;
        this.reader = Objects.requireNonNull(reader);
        this.position = Objects.requireNonNull(position);
        this.eventTime = eventTime;
        this.partition = partition;
        this.offset = offset;
        this.offsetWatermark = offset == null ? LOWEST_INSTANT.getMillis() : offset.getWatermark();
        this.limit = limit;
        this.stopAtCurrent = stopAtCurrent;
        this.finished = limit <= 0L;
        Preconditions.checkArgument((partition != null || offset != null ? 1 : 0) != 0, (Object)"Either partition or offset has to be non-null");
        Preconditions.checkArgument((offset == null || !stopAtCurrent ? 1 : 0) != 0, (Object)"Offset can be used only for streaming reader");
    }

    private URI getUri() {
        return this.reader.getUri();
    }

    public boolean start() throws IOException {
        this.observer = BlockingQueueLogObserver.create(this.name == null ? "Source(" + this.reader.getUri() + "@" + this.partition.getId() + ")" : this.name, this.limit, this.offsetWatermark);
        if (!this.finished) {
            this.handle = this.offset != null ? this.reader.observeBulkOffsets(Collections.singletonList(this.offset), (LogObserver)this.observer) : this.reader.observeBulkPartitions(this.name, Collections.singletonList(this.partition), this.position, this.stopAtCurrent, (LogObserver)this.observer);
        }
        return this.advance();
    }

    public boolean advance() throws IOException {
        Throwable error;
        if (!this.finished) {
            this.autoCommitIfBounded();
            try {
                if (this.limit > 0L) {
                    this.current = this.takeNext();
                    if (this.current == null) {
                        return false;
                    }
                } else {
                    this.current = null;
                }
                --this.limit;
                if (this.current != null) {
                    if (!this.eventTime) {
                        this.currentProcessingTime = Instant.now();
                    }
                    return true;
                }
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        if ((error = this.observer.getError()) != null) {
            throw new IOException(error);
        }
        this.finished = true;
        log.debug("Finished reading observer name {}, partition {}", (Object)this.name, (Object)this.partition);
        return false;
    }

    private StreamElement takeNext() throws InterruptedException {
        if (this.stopAtCurrent) {
            return this.observer.takeBlocking();
        }
        return this.observer.take();
    }

    public Instant getCurrentTimestamp() {
        if (!this.finished) {
            if (this.eventTime) {
                return new Instant(this.getCurrent().getStamp());
            }
            return this.currentProcessingTime;
        }
        return HIGHEST_INSTANT;
    }

    public StreamElement getCurrent() throws NoSuchElementException {
        if (this.current == null) {
            throw new NoSuchElementException();
        }
        return this.current;
    }

    public void close() {
        if (this.observer != null) {
            this.observer.stop();
            if (this.observer.getLastWrittenContext() != null) {
                this.observer.getLastWrittenContext().nack();
            }
            this.observer = null;
        }
        if (this.handle != null) {
            this.handle.close();
            this.handle = null;
        }
        log.debug("Closed reader {}", (Object)this);
    }

    @Nullable
    Offset getCurrentOffset() {
        return this.observer == null || this.observer.getLastReadContext() == null ? null : this.observer.getLastReadContext().getOffset();
    }

    boolean hasExternalizableOffsets() {
        return this.reader.hasExternalizableOffsets();
    }

    @Nullable
    LogObserver.OffsetCommitter getLastReadCommitter() {
        return this.observer == null ? null : this.observer.getLastReadContext();
    }

    @Nullable
    LogObserver.OffsetCommitter getLastWrittenCommitter() {
        return this.observer == null ? null : this.observer.getLastWrittenContext();
    }

    void clearIncomingQueue() {
        if (this.observer != null) {
            this.observer.clearIncomingQueue();
        }
    }

    private Instant getWatermark() {
        Instant watermark = this.finished ? HIGHEST_INSTANT : (this.eventTime ? new Instant(this.observer.getWatermark()) : new Instant(System.currentTimeMillis() - 500L));
        if (watermark.isBefore((ReadableInstant)LOWEST_INSTANT)) {
            return LOWEST_INSTANT;
        }
        if (watermark.isAfter((ReadableInstant)HIGHEST_INSTANT)) {
            return HIGHEST_INSTANT;
        }
        return watermark;
    }

    private void autoCommitIfBounded() {
        if (this.stopAtCurrent) {
            Optional.ofNullable(this.getLastReadCommitter()).ifPresent(LogObserver.OffsetCommitter::confirm);
        }
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("name", (Object)this.name).add("partition", (Object)this.partition).add("offset", (Object)this.offset).add("eventTime", this.eventTime).add("stopAtCurrent", this.stopAtCurrent).add("reader", (Object)this.reader).toString();
    }

    public Partition getPartition() {
        return this.partition;
    }

    public ObserveHandle getHandle() {
        return this.handle;
    }

    public long getLimit() {
        return this.limit;
    }

    static class UnboundedCommitLogReader
    extends UnboundedSource.UnboundedReader<StreamElement> {
        private final DirectUnboundedSource source;
        private final BeamCommitLogReader reader;
        private boolean finished = false;

        UnboundedCommitLogReader(String name, DirectUnboundedSource source, CommitLogReader reader, Position position, boolean eventTime, long limit, @Nullable Partition partition, @Nullable Offset offset) {
            this.source = source;
            this.reader = new BeamCommitLogReader(name, reader, position, eventTime, partition, offset, limit, false);
        }

        public DirectUnboundedSource getCurrentSource() {
            return this.source;
        }

        public boolean start() throws IOException {
            log.debug("{} started to consume reader {}", (Object)this, (Object)this.reader.getUri());
            return this.reader.start();
        }

        public boolean advance() throws IOException {
            return this.reader.advance();
        }

        public StreamElement getCurrent() throws NoSuchElementException {
            return this.reader.getCurrent();
        }

        public void close() {
            this.reader.close();
        }

        public Instant getWatermark() {
            if (this.finished) {
                return HIGHEST_INSTANT;
            }
            return this.reader.getWatermark();
        }

        public DirectUnboundedSource.Checkpoint getCheckpointMark() {
            return new DirectUnboundedSource.Checkpoint(this.reader);
        }

        public Instant getCurrentTimestamp() throws NoSuchElementException {
            Instant boundedPos = this.reader.getCurrentTimestamp();
            if (boundedPos == HIGHEST_INSTANT) {
                this.finished = true;
                return boundedPos;
            }
            StreamElement current = this.reader.getCurrent();
            if (current != null) {
                return new Instant(current.getStamp());
            }
            throw new NoSuchElementException();
        }

        public byte[] getCurrentRecordId() throws NoSuchElementException {
            StreamElement el = this.getCurrent();
            if (el == null) {
                throw new NoSuchElementException();
            }
            if (this.getCurrentSource().requiresDeduping()) {
                return el.getUuid().getBytes(StandardCharsets.US_ASCII);
            }
            return EMPTY_BYTES;
        }

        public BeamCommitLogReader getReader() {
            return this.reader;
        }
    }
}

