package cz.o2.proxima.beam.direct.io;

import cz.o2.proxima.direct.batch.BatchLogObservable;
import cz.o2.proxima.direct.core.Partition;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.storage.StreamElement;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/beam/direct/io/BeamBatchLogReader.class */
class BeamBatchLogReader extends BoundedSource.BoundedReader<StreamElement> {
    private static final Logger log = LoggerFactory.getLogger(BeamBatchLogReader.class);
    private static final Instant LOWEST_INSTANT = BoundedWindow.TIMESTAMP_MIN_VALUE;
    private static final Instant HIGHEST_INSTANT = BoundedWindow.TIMESTAMP_MAX_VALUE;
    private final DirectBatchSource source;
    private final BatchLogObservable reader;
    private final List<AttributeDescriptor<?>> attrs;
    private final Partition split;
    private final long startStamp;
    private final long endStamp;
    private StreamElement current;
    private BlockingQueueLogObserver observer;
    private boolean finished = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public static BeamBatchLogReader of(DirectBatchSource directBatchSource, BatchLogObservable batchLogObservable, List<AttributeDescriptor<?>> list, Partition partition, long j, long j2) {
        return new BeamBatchLogReader(directBatchSource, batchLogObservable, list, partition, j, j2);
    }

    private BeamBatchLogReader(DirectBatchSource directBatchSource, BatchLogObservable batchLogObservable, List<AttributeDescriptor<?>> list, Partition partition, long j, long j2) {
        this.source = (DirectBatchSource) Objects.requireNonNull(directBatchSource);
        this.reader = (BatchLogObservable) Objects.requireNonNull(batchLogObservable);
        this.attrs = (List) Objects.requireNonNull(list);
        this.split = (Partition) Objects.requireNonNull(partition);
        this.startStamp = j;
        this.endStamp = j2;
    }

    /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
    public BoundedSource<StreamElement> m46getCurrentSource() {
        return this.source;
    }

    public boolean start() throws IOException {
        this.observer = BlockingQueueLogObserver.create("Source(" + this.split + ")", LOWEST_INSTANT.getMillis());
        this.reader.observe(Arrays.asList(this.split), this.attrs, this.observer);
        return advance();
    }

    public boolean advance() throws IOException {
        while (true) {
            try {
                this.current = this.observer.takeBlocking();
                if (this.current == null || (this.current.getStamp() >= this.startStamp && this.current.getStamp() < this.endStamp)) {
                    break;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                close();
                return false;
            }
        }
        if (this.observer.getError() != null) {
            throw new IOException(this.observer.getError());
        }
        if (this.current != null) {
            return true;
        }
        this.finished = true;
        return false;
    }

    /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
    public StreamElement m47getCurrent() throws NoSuchElementException {
        if (this.current == null) {
            throw new NoSuchElementException();
        }
        return this.current;
    }

    public void close() throws IOException {
        log.debug("Closing partition {}", this.split);
        this.observer.stop();
    }

    public Instant getCurrentTimestamp() throws NoSuchElementException {
        return !this.finished ? LOWEST_INSTANT : HIGHEST_INSTANT;
    }
}
