package cz.o2.proxima.beam.direct.io;

import cz.o2.proxima.beam.direct.io.DirectUnboundedSource;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.commitlog.Offset;
import cz.o2.proxima.internal.shaded.com.google.common.base.MoreObjects;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cz/o2/proxima/beam/direct/io/BeamCommitLogReader.class */
public class BeamCommitLogReader {
    private static final Logger log = LoggerFactory.getLogger(BeamCommitLogReader.class);
    private static final Instant LOWEST_INSTANT = BoundedWindow.TIMESTAMP_MIN_VALUE;
    private static final Instant HIGHEST_INSTANT = BoundedWindow.TIMESTAMP_MAX_VALUE;
    private static final byte[] EMPTY_BYTES = new byte[0];
    private static final long AUTO_WATERMARK_LAG_MS = 500;
    private final Partition partition;
    private ObserveHandle handle;

    @Nullable
    private final String name;
    private final CommitLogReader reader;
    private final Position position;
    private final boolean eventTime;
    private final boolean stopAtCurrent;
    private boolean finished;
    private long limit;

    @Nullable
    private final Offset startingOffset;
    private final long offsetWatermark;

    @Nullable
    private BlockingQueueLogObserver observer;
    private StreamElement current;
    private Instant currentProcessingTime = Instant.now();
    private Instant lastReadWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;

    /* loaded from: input_file:cz/o2/proxima/beam/direct/io/BeamCommitLogReader$UnboundedCommitLogReader.class */
    static class UnboundedCommitLogReader extends UnboundedSource.UnboundedReader<StreamElement> {
        private final DirectUnboundedSource source;
        private final BeamCommitLogReader reader;
        private Instant previousWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
        private boolean finished = false;

        UnboundedCommitLogReader(String str, DirectUnboundedSource directUnboundedSource, CommitLogReader commitLogReader, Position position, boolean z, long j, @Nullable Partition partition, @Nullable Offset offset) {
            this.source = directUnboundedSource;
            this.reader = new BeamCommitLogReader(str, commitLogReader, position, z, partition, offset, j, false);
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public DirectUnboundedSource m20getCurrentSource() {
            return this.source;
        }

        public boolean start() throws IOException {
            BeamCommitLogReader.log.debug("{} started to consume reader {}", this, this.reader.getUri());
            return this.reader.start();
        }

        public boolean advance() throws IOException {
            return this.reader.advance();
        }

        /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
        public StreamElement m21getCurrent() throws NoSuchElementException {
            return this.reader.getCurrent();
        }

        public void close() {
            this.reader.close();
        }

        public Instant getWatermark() {
            if (this.finished) {
                return BeamCommitLogReader.HIGHEST_INSTANT;
            }
            Instant watermark = this.reader.getWatermark();
            if (watermark.isBefore(this.previousWatermark)) {
                BeamCommitLogReader.log.warn("Watermark shifts back in time. Previous: [{}], Current: [{}].", this.previousWatermark, watermark);
            }
            this.previousWatermark = watermark;
            return watermark;
        }

        /* renamed from: getCheckpointMark, reason: merged with bridge method [inline-methods] */
        public DirectUnboundedSource.Checkpoint m19getCheckpointMark() {
            return new DirectUnboundedSource.Checkpoint(this.reader);
        }

        public Instant getCurrentTimestamp() throws NoSuchElementException {
            Instant currentTimestamp = this.reader.getCurrentTimestamp();
            if (currentTimestamp == BeamCommitLogReader.HIGHEST_INSTANT) {
                this.finished = true;
                return currentTimestamp;
            }
            StreamElement current = this.reader.getCurrent();
            if (current != null) {
                return new Instant(current.getStamp());
            }
            throw new NoSuchElementException();
        }

        public byte[] getCurrentRecordId() throws NoSuchElementException {
            StreamElement m21getCurrent = m21getCurrent();
            if (m21getCurrent == null) {
                throw new NoSuchElementException();
            }
            return m20getCurrentSource().requiresDeduping() ? m21getCurrent.getUuid().getBytes(StandardCharsets.US_ASCII) : BeamCommitLogReader.EMPTY_BYTES;
        }

        public BeamCommitLogReader getReader() {
            return this.reader;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static BoundedSource.BoundedReader<StreamElement> bounded(final BoundedSource<StreamElement> boundedSource, String str, CommitLogReader commitLogReader, Position position, long j, Partition partition) {
        final BeamCommitLogReader beamCommitLogReader = new BeamCommitLogReader(str, commitLogReader, position, true, partition, null, j, true);
        return new BoundedSource.BoundedReader<StreamElement>() { // from class: cz.o2.proxima.beam.direct.io.BeamCommitLogReader.1
            /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
            public BoundedSource<StreamElement> m16getCurrentSource() {
                return boundedSource;
            }

            public boolean start() throws IOException {
                return beamCommitLogReader.start();
            }

            public boolean advance() throws IOException {
                return beamCommitLogReader.advance();
            }

            /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
            public StreamElement m17getCurrent() throws NoSuchElementException {
                return beamCommitLogReader.getCurrent();
            }

            public void close() {
                beamCommitLogReader.close();
            }

            public Instant getCurrentTimestamp() throws NoSuchElementException {
                return beamCommitLogReader.getCurrentTimestamp();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static UnboundedCommitLogReader unbounded(DirectUnboundedSource directUnboundedSource, String str, CommitLogReader commitLogReader, Position position, boolean z, long j, @Nullable Partition partition, @Nullable Offset offset) {
        return new UnboundedCommitLogReader(str, directUnboundedSource, commitLogReader, position, z, j, partition, offset);
    }

    private BeamCommitLogReader(@Nullable String str, CommitLogReader commitLogReader, Position position, boolean z, @Nullable Partition partition, @Nullable Offset offset, long j, boolean z2) {
        this.name = str;
        this.reader = (CommitLogReader) Objects.requireNonNull(commitLogReader);
        this.position = (Position) Objects.requireNonNull(position);
        this.eventTime = z;
        this.partition = partition;
        this.startingOffset = offset;
        this.offsetWatermark = offset == null ? LOWEST_INSTANT.getMillis() : offset.getWatermark();
        this.limit = j;
        this.stopAtCurrent = z2;
        this.finished = j <= 0;
        Preconditions.checkArgument((partition == null && offset == null) ? false : true, "Either partition or offset has to be non-null");
        Preconditions.checkArgument(offset == null || !z2, "Offset can be used only for streaming reader");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public URI getUri() {
        return this.reader.getUri();
    }

    public boolean start() throws IOException {
        this.observer = BlockingQueueLogObserver.create(this.name == null ? "Source(" + this.reader.getUri() + ":" + this.partition.getId() + ")" : this.name, this.limit, this.offsetWatermark);
        log.debug("Starting {}@{} with offset {} and partition {}", new Object[]{this.name, this.reader.getUri(), this.startingOffset, this.partition});
        if (!this.finished) {
            if (this.startingOffset != null) {
                this.handle = this.reader.observeBulkOffsets(Collections.singletonList(this.startingOffset), this.observer);
            } else {
                this.handle = this.reader.observeBulkPartitions(this.name, Collections.singletonList(this.partition), this.position, this.stopAtCurrent, this.observer);
            }
        }
        return advance();
    }

    public boolean advance() throws IOException {
        if (!this.finished) {
            autoCommitIfBounded();
            try {
                if (this.limit > 0) {
                    this.current = takeNext();
                    if (this.current == null) {
                        this.lastReadWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
                        return false;
                    }
                } else {
                    this.current = null;
                }
                this.limit--;
                if (this.current != null) {
                    if (!this.eventTime) {
                        this.currentProcessingTime = Instant.now();
                    }
                    this.lastReadWatermark = Instant.ofEpochMilli(this.observer.getWatermark());
                    return true;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        Throwable error = this.observer.getError();
        if (error != null) {
            throw new IOException(error);
        }
        this.lastReadWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
        this.finished = true;
        log.debug("Finished reading observer name {}, partition {}", this.name, this.partition);
        return false;
    }

    private StreamElement takeNext() throws InterruptedException {
        return this.stopAtCurrent ? this.observer.takeBlocking() : this.observer.take();
    }

    public Instant getCurrentTimestamp() {
        return !this.eventTime ? this.currentProcessingTime : !this.finished ? new Instant(getCurrent().getStamp()) : this.lastReadWatermark;
    }

    public StreamElement getCurrent() throws NoSuchElementException {
        if (this.current == null) {
            throw new NoSuchElementException();
        }
        return this.current;
    }

    public void close() {
        if (this.observer != null) {
            this.observer.stop();
            if (this.observer.getLastWrittenContext() != null) {
                this.observer.getLastWrittenContext().nack();
            }
            this.observer = null;
        }
        if (this.handle != null) {
            this.handle.close();
            this.handle = null;
        }
        log.debug("Closed reader {}", this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public Offset getCurrentOffset() {
        Offset offset = (this.observer == null || this.observer.getLastReadContext() == null) ? null : this.observer.getLastReadContext().getOffset();
        return offset != null ? offset : this.startingOffset;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasExternalizableOffsets() {
        return this.reader.hasExternalizableOffsets();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public LogObserver.OffsetCommitter getLastReadCommitter() {
        if (this.observer == null) {
            return null;
        }
        return this.observer.getLastReadContext();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public LogObserver.OffsetCommitter getLastWrittenCommitter() {
        if (this.observer == null) {
            return null;
        }
        return this.observer.getLastWrittenContext();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearIncomingQueue() {
        if (this.observer != null) {
            this.observer.clearIncomingQueue();
        }
    }

    private Instant getWatermark() {
        Instant instant;
        if (this.finished) {
            instant = HIGHEST_INSTANT;
        } else if (!this.eventTime) {
            instant = new Instant(System.currentTimeMillis() - AUTO_WATERMARK_LAG_MS);
        } else {
            if (this.observer == null) {
                log.warn("Call to getWatermark() before start(). This breaks UnboundedSource.Reader contract.");
                return LOWEST_INSTANT;
            }
            instant = new Instant(this.observer.getWatermark());
        }
        return instant.isBefore(LOWEST_INSTANT) ? LOWEST_INSTANT : instant.isAfter(HIGHEST_INSTANT) ? HIGHEST_INSTANT : instant.isAfter(this.lastReadWatermark) ? this.lastReadWatermark : instant;
    }

    private void autoCommitIfBounded() {
        if (this.stopAtCurrent) {
            Optional.ofNullable(getLastReadCommitter()).ifPresent((v0) -> {
                v0.confirm();
            });
        }
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("name", this.name).add("partition", this.partition).add("startingOffset", this.startingOffset).add("eventTime", this.eventTime).add("stopAtCurrent", this.stopAtCurrent).add("reader", this.reader).toString();
    }

    public Partition getPartition() {
        return this.partition;
    }

    public ObserveHandle getHandle() {
        return this.handle;
    }

    public long getLimit() {
        return this.limit;
    }
}
