/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.beam.direct.io;

import cz.o2.proxima.beam.direct.io.BlockingQueueLogObserver;
import cz.o2.proxima.beam.direct.io.DirectBatchSource;
import cz.o2.proxima.direct.batch.BatchLogObservable;
import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.core.Partition;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.storage.StreamElement;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BeamBatchLogReader
extends BoundedSource.BoundedReader<StreamElement> {
    private static final Logger log = LoggerFactory.getLogger(BeamBatchLogReader.class);
    private static final Instant LOWEST_INSTANT = BoundedWindow.TIMESTAMP_MIN_VALUE;
    private static final Instant HIGHEST_INSTANT = BoundedWindow.TIMESTAMP_MAX_VALUE;
    private final DirectBatchSource source;
    private final BatchLogObservable reader;
    private final List<AttributeDescriptor<?>> attrs;
    private final Partition split;
    private final long startStamp;
    private final long endStamp;
    private StreamElement current;
    private BlockingQueueLogObserver observer;
    private boolean finished = false;

    static BeamBatchLogReader of(DirectBatchSource source, BatchLogObservable reader, List<AttributeDescriptor<?>> attrs, Partition split, long startStamp, long endStamp) {
        return new BeamBatchLogReader(source, reader, attrs, split, startStamp, endStamp);
    }

    private BeamBatchLogReader(DirectBatchSource source, BatchLogObservable reader, List<AttributeDescriptor<?>> attrs, Partition split, long startStamp, long endStamp) {
        this.source = Objects.requireNonNull(source);
        this.reader = Objects.requireNonNull(reader);
        this.attrs = Objects.requireNonNull(attrs);
        this.split = Objects.requireNonNull(split);
        this.startStamp = startStamp;
        this.endStamp = endStamp;
    }

    public BoundedSource<StreamElement> getCurrentSource() {
        return this.source;
    }

    public boolean start() throws IOException {
        this.observer = BlockingQueueLogObserver.create("Source(" + this.split + ")", LOWEST_INSTANT.getMillis());
        this.reader.observe(Arrays.asList(this.split), this.attrs, (BatchLogObserver)this.observer);
        return this.advance();
    }

    public boolean advance() throws IOException {
        try {
            do {
                this.current = this.observer.takeBlocking();
            } while (this.current != null && (this.current.getStamp() < this.startStamp || this.current.getStamp() >= this.endStamp));
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            this.close();
            return false;
        }
        if (this.observer.getError() != null) {
            throw new IOException(this.observer.getError());
        }
        if (this.current != null) {
            return true;
        }
        this.finished = true;
        return false;
    }

    public StreamElement getCurrent() throws NoSuchElementException {
        if (this.current == null) {
            throw new NoSuchElementException();
        }
        return this.current;
    }

    public void close() throws IOException {
        log.debug("Closing partition {}", (Object)this.split);
        this.observer.stop();
    }

    public Instant getCurrentTimestamp() throws NoSuchElementException {
        if (!this.finished) {
            return LOWEST_INSTANT;
        }
        return HIGHEST_INSTANT;
    }
}

