package io.pravega.client.stream.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.UnmodifiableIterator;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.control.impl.Controller;
import io.pravega.client.security.auth.DelegationTokenProviderFactory;
import io.pravega.client.segment.impl.EndOfSegmentException;
import io.pravega.client.segment.impl.EventSegmentReader;
import io.pravega.client.segment.impl.NoSuchEventException;
import io.pravega.client.segment.impl.NoSuchSegmentException;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentInfo;
import io.pravega.client.segment.impl.SegmentInputStreamFactory;
import io.pravega.client.segment.impl.SegmentMetadataClient;
import io.pravega.client.segment.impl.SegmentMetadataClientFactory;
import io.pravega.client.segment.impl.SegmentTruncatedException;
import io.pravega.client.stream.EventPointer;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.Position;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderNotInReaderGroupException;
import io.pravega.client.stream.ReinitializationRequiredException;
import io.pravega.client.stream.Sequence;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.TimeWindow;
import io.pravega.client.stream.TruncatedDataException;
import io.pravega.client.stream.impl.SegmentWithRange;
import io.pravega.common.Exceptions;
import io.pravega.common.Timer;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.CopyOnWriteHashMap;
import io.pravega.shared.security.auth.AccessOperation;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/stream/impl/EventStreamReaderImpl.class */
public class EventStreamReaderImpl<Type> implements EventStreamReader<Type> {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(EventStreamReaderImpl.class);
    private static final long BASE_READER_WAITING_TIME_MS = ReaderGroupStateManager.TIME_UNIT.toMillis();
    private static final int MAX_BUFFERED_SEGMENT_OFFSET_UPDATES = 1000;
    private final Serializer<Type> deserializer;
    private final SegmentInputStreamFactory inputStreamFactory;
    private final SegmentMetadataClientFactory metadataClientFactory;
    private final Orderer orderer;
    private final ReaderConfig config;
    private final ImmutableMap<Stream, WatermarkReaderImpl> waterMarkReaders;

    @GuardedBy("readers")
    private Sequence lastRead;

    @GuardedBy("readers")
    private String atCheckpoint;
    private final ReaderGroupStateManager groupState;
    private final Supplier<Long> clock;
    private final Controller controller;

    @GuardedBy("readers")
    private final List<EventSegmentReader> readers = new ArrayList();

    @GuardedBy("readers")
    private final Map<Segment, Long> sealedSegments = new HashMap();
    private CopyOnWriteHashMap<Segment, SegmentWithRange.Range> ranges = new CopyOnWriteHashMap<>();
    private Map<Segment, Long> ownedSegments = new HashMap();
    private List<Map.Entry<Segment, Long>> segmentOffsetUpdates = newImmutableSegmentOffsetUpdatesList();

    @GuardedBy("readers")
    private int segmentOffsetUpdatesIndex = 0;

    @GuardedBy("readers")
    private boolean closed = false;
    private final Semaphore segmentsWithData = new Semaphore(0);

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventStreamReaderImpl(SegmentInputStreamFactory segmentInputStreamFactory, SegmentMetadataClientFactory segmentMetadataClientFactory, Serializer<Type> serializer, ReaderGroupStateManager readerGroupStateManager, Orderer orderer, Supplier<Long> supplier, ReaderConfig readerConfig, ImmutableMap<Stream, WatermarkReaderImpl> immutableMap, Controller controller) {
        this.deserializer = serializer;
        this.inputStreamFactory = segmentInputStreamFactory;
        this.metadataClientFactory = segmentMetadataClientFactory;
        this.groupState = readerGroupStateManager;
        this.orderer = orderer;
        this.clock = supplier;
        this.config = readerConfig;
        this.waterMarkReaders = immutableMap;
        this.controller = controller;
    }

    @Override // io.pravega.client.stream.EventStreamReader
    public EventRead<Type> readNextEvent(long j) throws ReinitializationRequiredException, TruncatedDataException {
        EventRead<Type> readNextEventInternal;
        synchronized (this.readers) {
            Preconditions.checkState(!this.closed, "Reader is closed");
            try {
                readNextEventInternal = readNextEventInternal(j);
            } catch (ReaderNotInReaderGroupException e) {
                close();
                throw new ReinitializationRequiredException(e);
            }
        }
        return readNextEventInternal;
    }

    /* JADX WARN: Finally extract failed */
    private EventRead<Type> readNextEventInternal(long j) throws ReaderNotInReaderGroupException, TruncatedDataException {
        long min = Math.min(j, BASE_READER_WAITING_TIME_MS);
        Timer timer = new Timer();
        Segment segment = null;
        long j2 = -1;
        ByteBuffer byteBuffer = null;
        do {
            String updateGroupStateIfNeeded = updateGroupStateIfNeeded();
            if (updateGroupStateIfNeeded == null) {
                EventSegmentReader nextSegment = this.orderer.nextSegment(this.readers);
                if (nextSegment == null) {
                    blockFor(min);
                    this.segmentsWithData.drainPermits();
                    byteBuffer = null;
                } else {
                    segment = nextSegment.getSegmentId();
                    j2 = nextSegment.getOffset();
                    try {
                        try {
                            byteBuffer = nextSegment.read(min);
                            if (byteBuffer == null) {
                                refreshAndGetPosition();
                            }
                        } catch (EndOfSegmentException e) {
                            handleEndOfSegment(nextSegment, e.getErrorType().equals(EndOfSegmentException.ErrorType.END_OF_SEGMENT_REACHED));
                            byteBuffer = null;
                            if (0 == 0) {
                                refreshAndGetPosition();
                            }
                        } catch (SegmentTruncatedException e2) {
                            handleSegmentTruncated(nextSegment);
                            byteBuffer = null;
                            if (0 == 0) {
                                refreshAndGetPosition();
                            }
                        }
                    } catch (Throwable th) {
                        if (byteBuffer == null) {
                            refreshAndGetPosition();
                        }
                        throw th;
                    }
                }
                if (byteBuffer != null) {
                    break;
                }
            } else {
                return createEmptyEvent(updateGroupStateIfNeeded);
            }
        } while (timer.getElapsedMillis() < j);
        if (byteBuffer == null) {
            log.debug("Empty event returned for reader {} ", this.groupState.getReaderId());
            return createEmptyEvent(null);
        }
        this.lastRead = Sequence.create(segment.getSegmentId(), j2);
        int remaining = byteBuffer.remaining() + 8;
        addSegmentOffsetUpdateIfNeeded(segment, j2 + remaining);
        return new EventReadImpl(this.deserializer.deserialize(byteBuffer), getCurrentPosition(), new EventPointerImpl(segment, j2, remaining), null);
    }

    private void addSegmentOffsetUpdateIfNeeded(Segment segment, long j) {
        if (this.segmentOffsetUpdatesIndex >= MAX_BUFFERED_SEGMENT_OFFSET_UPDATES) {
            refreshAndGetPosition();
        } else {
            this.segmentOffsetUpdates.set(this.segmentOffsetUpdatesIndex, new AbstractMap.SimpleEntry(segment, Long.valueOf(j)));
            this.segmentOffsetUpdatesIndex++;
        }
    }

    private void blockFor(long j) {
        Exceptions.handleInterrupted(() -> {
            this.segmentsWithData.tryAcquire(j, TimeUnit.MILLISECONDS);
        });
    }

    private EventRead<Type> createEmptyEvent(String str) {
        return new EventReadImpl(null, refreshAndGetPosition(), null, str);
    }

    private PositionInternal refreshAndGetPosition() {
        this.segmentOffsetUpdates = newImmutableSegmentOffsetUpdatesList();
        this.segmentOffsetUpdatesIndex = 0;
        this.ownedSegments = new HashMap(this.sealedSegments);
        for (EventSegmentReader eventSegmentReader : this.readers) {
            this.ownedSegments.put(eventSegmentReader.getSegmentId(), Long.valueOf(eventSegmentReader.getOffset()));
        }
        return getCurrentPosition();
    }

    private PositionInternal getCurrentPosition() {
        return PositionImpl.builder().ownedSegments(this.ownedSegments).segmentRanges(this.ranges.getInnerMap()).updatesToSegmentOffsets(this.segmentOffsetUpdates.subList(0, this.segmentOffsetUpdatesIndex)).m59build();
    }

    private List<Map.Entry<Segment, Long>> newImmutableSegmentOffsetUpdatesList() {
        return Arrays.asList(new Map.Entry[MAX_BUFFERED_SEGMENT_OFFSET_UPDATES]);
    }

    @GuardedBy("readers")
    private String updateGroupStateIfNeeded() throws ReaderNotInReaderGroupException {
        this.groupState.updateConfigIfNeeded();
        PositionInternal positionInternal = null;
        if (this.atCheckpoint != null) {
            positionInternal = refreshAndGetPosition();
            this.groupState.checkpoint(this.atCheckpoint, positionInternal);
            log.info("Reader {} completed checkpoint {}", this.groupState.getReaderId(), this.atCheckpoint);
            releaseSegmentsIfNeeded(positionInternal);
            this.groupState.updateTruncationStreamCutIfNeeded();
        }
        String checkpoint = this.groupState.getCheckpoint();
        while (true) {
            String str = checkpoint;
            if (str == null) {
                this.atCheckpoint = null;
                if (positionInternal == null && this.lastRead != null && !this.groupState.canAcquireSegmentIfNeeded() && !this.groupState.canUpdateLagIfNeeded()) {
                    return null;
                }
                PositionInternal refreshAndGetPosition = positionInternal == null ? refreshAndGetPosition() : positionInternal;
                if (!acquireSegmentsIfNeeded(refreshAndGetPosition) && !this.groupState.updateLagIfNeeded(getLag(), refreshAndGetPosition)) {
                    return null;
                }
                this.waterMarkReaders.forEach((stream, watermarkReaderImpl) -> {
                    watermarkReaderImpl.advanceTo(this.groupState.getLastReadpositions(stream));
                });
                refreshAndGetPosition();
                return null;
            }
            log.info("{} at checkpoint {}", this, str);
            if (!this.groupState.isCheckpointSilent(str)) {
                this.atCheckpoint = str;
                return this.atCheckpoint;
            }
            positionInternal = refreshAndGetPosition();
            this.groupState.checkpoint(str, positionInternal);
            if (this.atCheckpoint != null) {
                releaseSegmentsIfNeeded(positionInternal);
                this.atCheckpoint = null;
            }
            checkpoint = this.groupState.getCheckpoint();
        }
    }

    @GuardedBy("readers")
    private void releaseSegmentsIfNeeded(PositionInternal positionInternal) throws ReaderNotInReaderGroupException {
        releaseSealedSegments();
        Segment findSegmentToReleaseIfRequired = this.groupState.findSegmentToReleaseIfRequired();
        if (findSegmentToReleaseIfRequired != null) {
            log.info("{} releasing segment {}", this, findSegmentToReleaseIfRequired);
            EventSegmentReader orElse = this.readers.stream().filter(eventSegmentReader -> {
                return eventSegmentReader.getSegmentId().equals(findSegmentToReleaseIfRequired);
            }).findAny().orElse(null);
            if (orElse == null || !this.groupState.releaseSegment(findSegmentToReleaseIfRequired, orElse.getOffset(), getLag(), positionInternal)) {
                return;
            }
            this.readers.remove(orElse);
            this.ranges.remove(orElse.getSegmentId());
            orElse.close();
        }
    }

    private void releaseSealedSegments() throws ReaderNotInReaderGroupException {
        Iterator<Map.Entry<Segment, Long>> it = this.sealedSegments.entrySet().iterator();
        while (it.hasNext()) {
            Segment key = it.next().getKey();
            log.info("{} releasing sealed segment {}", this, key);
            if (!this.groupState.handleEndOfSegment(new SegmentWithRange(key, (SegmentWithRange.Range) this.ranges.get(key)))) {
                return;
            }
            this.ranges.remove(key);
            it.remove();
        }
    }

    @GuardedBy("readers")
    private boolean acquireSegmentsIfNeeded(PositionInternal positionInternal) throws ReaderNotInReaderGroupException {
        Map<SegmentWithRange, Long> acquireNewSegmentsIfNeeded = this.groupState.acquireNewSegmentsIfNeeded(getLag(), positionInternal);
        if (acquireNewSegmentsIfNeeded.isEmpty()) {
            return false;
        }
        log.info("{} acquiring segments {}", this, acquireNewSegmentsIfNeeded);
        for (Map.Entry<SegmentWithRange, Long> entry : acquireNewSegmentsIfNeeded.entrySet()) {
            long endOffsetForSegment = this.groupState.getEndOffsetForSegment(entry.getKey().getSegment());
            if (entry.getValue().longValue() < 0 || (entry.getValue().longValue() == endOffsetForSegment && endOffsetForSegment != Long.MAX_VALUE)) {
                this.sealedSegments.put(entry.getKey().getSegment(), entry.getValue());
                this.ranges.put(entry.getKey().getSegment(), entry.getKey().getRange());
            } else {
                Segment segment = entry.getKey().getSegment();
                EventSegmentReader createEventReaderForSegment = this.inputStreamFactory.createEventReaderForSegment(segment, this.config.getBufferSize(), this.segmentsWithData, endOffsetForSegment);
                createEventReaderForSegment.setOffset(entry.getValue().longValue());
                this.readers.add(createEventReaderForSegment);
                this.ranges.put(segment, entry.getKey().getRange());
            }
        }
        this.segmentsWithData.release();
        return true;
    }

    private long getLag() {
        if (this.lastRead == null) {
            return 0L;
        }
        return this.clock.get().longValue() - this.lastRead.getHighOrder();
    }

    @GuardedBy("readers")
    private void handleEndOfSegment(EventSegmentReader eventSegmentReader, boolean z) {
        Segment segmentId = eventSegmentReader.getSegmentId();
        log.info("{} encountered end of segment {} ", this, eventSegmentReader.getSegmentId());
        this.readers.remove(eventSegmentReader);
        eventSegmentReader.close();
        this.sealedSegments.put(segmentId, Long.valueOf(z ? -1L : eventSegmentReader.getOffset()));
    }

    private void handleSegmentTruncated(EventSegmentReader eventSegmentReader) throws TruncatedDataException {
        Segment segmentId = eventSegmentReader.getSegmentId();
        log.info("{} encountered truncation for segment {} ", this, segmentId);
        SegmentMetadataClient createSegmentMetadataClient = this.metadataClientFactory.createSegmentMetadataClient(segmentId, DelegationTokenProviderFactory.create(this.controller, segmentId, AccessOperation.READ));
        try {
            try {
                long startingOffset = ((SegmentInfo) Futures.getThrowingException(createSegmentMetadataClient.getSegmentInfo())).getStartingOffset();
                if (eventSegmentReader.getOffset() == startingOffset) {
                    log.warn("Attempt to fetch the next available read offset on the segment {} returned a truncated offset {}", segmentId, Long.valueOf(startingOffset));
                }
                eventSegmentReader.setOffset(startingOffset);
            } catch (NoSuchSegmentException e) {
                handleEndOfSegment(eventSegmentReader, true);
            }
            throw new TruncatedDataException();
        } catch (Throwable th) {
            if (Collections.singletonList(createSegmentMetadataClient).get(0) != null) {
                createSegmentMetadataClient.close();
            }
            throw th;
        }
    }

    @Override // io.pravega.client.stream.EventStreamReader
    public ReaderConfig getConfig() {
        return this.config;
    }

    @Override // io.pravega.client.stream.EventStreamReader, java.lang.AutoCloseable
    public void close() {
        closeAt(refreshAndGetPosition());
        UnmodifiableIterator it = this.waterMarkReaders.values().iterator();
        while (it.hasNext()) {
            ((WatermarkReaderImpl) it.next()).close();
        }
    }

    @Override // io.pravega.client.stream.EventStreamReader
    public void closeAt(Position position) {
        synchronized (this.readers) {
            if (!this.closed) {
                log.info("Closing reader {} at position {}.", this, position);
                this.closed = true;
                this.groupState.readerShutdown(position);
                Iterator<EventSegmentReader> it = this.readers.iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
                this.readers.clear();
                this.ranges = new CopyOnWriteHashMap<>();
                this.ownedSegments = new HashMap();
                this.segmentOffsetUpdates = newImmutableSegmentOffsetUpdatesList();
                this.segmentOffsetUpdatesIndex = 0;
                this.groupState.close();
            }
        }
    }

    @Override // io.pravega.client.stream.EventStreamReader
    public Type fetchEvent(EventPointer eventPointer) throws NoSuchEventException {
        Preconditions.checkNotNull(eventPointer);
        EventSegmentReader createEventReaderForSegment = this.inputStreamFactory.createEventReaderForSegment(eventPointer.asImpl().getSegment(), eventPointer.asImpl().getEventLength());
        try {
            createEventReaderForSegment.setOffset(eventPointer.asImpl().getEventStartOffset());
            try {
                try {
                    Type deserialize = this.deserializer.deserialize(createEventReaderForSegment.read());
                    if (Collections.singletonList(createEventReaderForSegment).get(0) != null) {
                        createEventReaderForSegment.close();
                    }
                    return deserialize;
                } catch (NoSuchSegmentException | SegmentTruncatedException e) {
                    throw new NoSuchEventException("Event no longer exists.");
                }
            } catch (EndOfSegmentException e2) {
                throw new NoSuchEventException(e2.getMessage());
            }
        } catch (Throwable th) {
            if (Collections.singletonList(createEventReaderForSegment).get(0) != null) {
                createEventReaderForSegment.close();
            }
            throw th;
        }
    }

    @VisibleForTesting
    List<EventSegmentReader> getReaders() {
        ImmutableList copyOf;
        synchronized (this.readers) {
            copyOf = ImmutableList.copyOf(this.readers);
        }
        return copyOf;
    }

    @VisibleForTesting
    Map<Segment, SegmentWithRange.Range> getRanges() {
        ImmutableMap copyOf;
        synchronized (this.readers) {
            copyOf = ImmutableMap.copyOf(this.ranges.getInnerMap());
        }
        return copyOf;
    }

    public String toString() {
        return "EventStreamReaderImpl( id=" + this.groupState.getReaderId() + ")";
    }

    @Override // io.pravega.client.stream.EventStreamReader
    public TimeWindow getCurrentTimeWindow(Stream stream) {
        if (getConfig().isDisableTimeWindows()) {
            return new TimeWindow(null, null);
        }
        WatermarkReaderImpl watermarkReaderImpl = (WatermarkReaderImpl) this.waterMarkReaders.get(stream);
        if (watermarkReaderImpl == null) {
            throw new IllegalArgumentException("Reader is not subscribed to stream: " + stream);
        }
        return watermarkReaderImpl.getTimeWindow();
    }
}
