package io.pravega.client.segment.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.ExceptionHelpers;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.FutureHelpers;
import io.pravega.common.util.CircularBuffer;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shared.protocol.netty.InvalidMessageException;
import io.pravega.shared.protocol.netty.WireCommandType;
import io.pravega.shared.protocol.netty.WireCommands;
import java.nio.ByteBuffer;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/segment/impl/SegmentInputStreamImpl.class */
class SegmentInputStreamImpl implements SegmentInputStream {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(SegmentInputStreamImpl.class);

    @SuppressFBWarnings(justification = "generated code")
    private final Object $lock;
    private static final int DEFAULT_READ_LENGTH = 65536;
    static final int DEFAULT_BUFFER_SIZE = 131072;
    private final AsyncSegmentInputStream asyncInput;
    private final int readLength;

    @GuardedBy("$lock")
    private final CircularBuffer buffer;

    @GuardedBy("$lock")
    private final ByteBuffer headerReadingBuffer;

    @GuardedBy("$lock")
    private long offset;

    @GuardedBy("$lock")
    private boolean receivedEndOfSegment;

    @GuardedBy("$lock")
    private CompletableFuture<WireCommands.SegmentRead> outstandingRequest;

    SegmentInputStreamImpl(AsyncSegmentInputStream asyncSegmentInputStream, long j) {
        this(asyncSegmentInputStream, j, DEFAULT_BUFFER_SIZE);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SegmentInputStreamImpl(AsyncSegmentInputStream asyncSegmentInputStream, long j, int i) {
        this.$lock = new Object[0];
        this.headerReadingBuffer = ByteBuffer.allocate(8);
        this.receivedEndOfSegment = false;
        this.outstandingRequest = null;
        Preconditions.checkArgument(j >= 0);
        Preconditions.checkNotNull(asyncSegmentInputStream);
        this.asyncInput = asyncSegmentInputStream;
        this.offset = j;
        this.readLength = Math.min(DEFAULT_READ_LENGTH, i);
        this.buffer = new CircularBuffer(Math.max(i, this.readLength + 1));
        issueRequestIfNeeded();
    }

    @Override // io.pravega.client.segment.impl.SegmentInputStream
    public void setOffset(long j) {
        synchronized (this.$lock) {
            log.trace("SetOffset {}", Long.valueOf(j));
            Preconditions.checkArgument(j >= 0);
            Exceptions.checkNotClosed(this.asyncInput.isClosed(), this);
            if (j != this.offset) {
                this.offset = j;
                this.buffer.clear();
                this.receivedEndOfSegment = false;
                this.outstandingRequest = null;
            }
        }
    }

    @Override // io.pravega.client.segment.impl.SegmentInputStream
    public long getOffset() {
        long j;
        synchronized (this.$lock) {
            j = this.offset;
        }
        return j;
    }

    @Override // io.pravega.client.segment.impl.SegmentInputStream
    public ByteBuffer read(long j) throws EndOfSegmentException {
        ByteBuffer readEventData;
        synchronized (this.$lock) {
            log.trace("Read called at offset {}", Long.valueOf(this.offset));
            Exceptions.checkNotClosed(this.asyncInput.isClosed(), this);
            long j2 = this.offset;
            boolean z = false;
            try {
                readEventData = readEventData(j);
                z = true;
                if (1 == 0) {
                    this.outstandingRequest = null;
                    this.offset = j2;
                    this.buffer.clear();
                }
            } catch (Throwable th) {
                if (!z) {
                    this.outstandingRequest = null;
                    this.offset = j2;
                    this.buffer.clear();
                }
                throw th;
            }
        }
        return readEventData;
    }

    private ByteBuffer readEventData(long j) throws EndOfSegmentException {
        fillBuffer();
        while (this.buffer.dataAvailable() < 8) {
            if (this.buffer.dataAvailable() == 0 && this.receivedEndOfSegment) {
                throw new EndOfSegmentException();
            }
            if (FutureHelpers.getAndHandleExceptions(this.outstandingRequest, th -> {
                issueRequestIfNeeded();
            }, j) == null) {
                return null;
            }
            handleRequest();
        }
        this.headerReadingBuffer.clear();
        this.offset += this.buffer.read(this.headerReadingBuffer);
        this.headerReadingBuffer.flip();
        int i = this.headerReadingBuffer.getInt();
        int i2 = this.headerReadingBuffer.getInt();
        if (i != WireCommandType.EVENT.getCode()) {
            throw new InvalidMessageException("Event was of wrong type: " + i);
        }
        if (i2 < 0 || i2 > 8388607) {
            throw new InvalidMessageException("Event of invalid length: " + i2);
        }
        ByteBuffer allocate = ByteBuffer.allocate(i2);
        this.offset += this.buffer.read(allocate);
        while (allocate.hasRemaining()) {
            handleRequest();
            this.offset += this.buffer.read(allocate);
        }
        allocate.flip();
        return allocate;
    }

    private boolean dataWaitingToGoInBuffer() {
        return this.outstandingRequest != null && FutureHelpers.isSuccessful(this.outstandingRequest) && this.buffer.capacityAvailable() > 0;
    }

    private void handleRequest() {
        WireCommands.SegmentRead join = this.outstandingRequest.join();
        verifyIsAtCorrectOffset(join);
        if (join.getData().hasRemaining()) {
            this.buffer.fill(join.getData());
        }
        if (join.isEndOfSegment()) {
            this.receivedEndOfSegment = true;
        }
        if (join.getData().hasRemaining()) {
            return;
        }
        this.outstandingRequest = null;
        issueRequestIfNeeded();
    }

    private void verifyIsAtCorrectOffset(WireCommands.SegmentRead segmentRead) {
        long offset = segmentRead.getOffset() + segmentRead.getData().position();
        long dataAvailable = this.offset + this.buffer.dataAvailable();
        Preconditions.checkState(offset == dataAvailable, "ReadSegment returned data for the wrong offset %s vs %s", offset, dataAvailable);
    }

    private void issueRequestIfNeeded() {
        if (this.receivedEndOfSegment || this.buffer.capacityAvailable() <= this.readLength) {
            return;
        }
        if (this.outstandingRequest == null) {
            this.outstandingRequest = this.asyncInput.read(this.offset + this.buffer.dataAvailable(), this.readLength);
            return;
        }
        if (this.outstandingRequest.isCompletedExceptionally()) {
            Throwable exception = FutureHelpers.getException(this.outstandingRequest);
            Throwable realException = ExceptionHelpers.getRealException(exception);
            if ((realException instanceof Error) || (realException instanceof InterruptedException) || (realException instanceof CancellationException)) {
                return;
            }
            log.warn("Encountered an exception while reading for " + this.asyncInput.getSegmentId(), exception);
            this.outstandingRequest = this.asyncInput.read(this.offset + this.buffer.dataAvailable(), this.readLength);
        }
    }

    @Override // io.pravega.client.segment.impl.SegmentInputStream, java.lang.AutoCloseable
    public void close() {
        synchronized (this.$lock) {
            log.trace("Closing {}", this);
            this.asyncInput.close();
        }
    }

    @Override // io.pravega.client.segment.impl.SegmentInputStream
    public void fillBuffer() {
        synchronized (this.$lock) {
            log.trace("Filling buffer {}", this);
            Exceptions.checkNotClosed(this.asyncInput.isClosed(), this);
            issueRequestIfNeeded();
            while (dataWaitingToGoInBuffer()) {
                handleRequest();
            }
        }
    }

    @Override // io.pravega.client.segment.impl.SegmentInputStream
    public boolean canReadWithoutBlocking() {
        boolean z;
        synchronized (this.$lock) {
            z = this.buffer.dataAvailable() > 0 || (this.outstandingRequest != null && FutureHelpers.isSuccessful(this.outstandingRequest) && this.outstandingRequest.join().getData().hasRemaining());
            log.trace("canReadWithoutBlocking {}", Boolean.valueOf(z));
        }
        return z;
    }

    @Override // io.pravega.client.segment.impl.SegmentInputStream
    public Segment getSegmentId() {
        return this.asyncInput.getSegmentId();
    }

    @SuppressFBWarnings(justification = "generated code")
    public String toString() {
        return "SegmentInputStreamImpl(asyncInput=" + this.asyncInput + ", readLength=" + this.readLength + ", buffer=" + this.buffer + ", headerReadingBuffer=" + this.headerReadingBuffer + ", offset=" + getOffset() + ", receivedEndOfSegment=" + this.receivedEndOfSegment + ", outstandingRequest=" + this.outstandingRequest + ")";
    }
}
