package io.pravega.client.stream.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.state.StateSynchronizer;
import io.pravega.client.stream.Position;
import io.pravega.client.stream.ReaderNotInReaderGroupException;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.impl.ReaderGroupState;
import io.pravega.common.TimeoutTimer;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.hash.HashHelper;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/stream/impl/ReaderGroupStateManager.class */
public class ReaderGroupStateManager {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log;
    static final Duration TIME_UNIT;
    static final Duration UPDATE_WINDOW;
    private static final double COMPACTION_PROBABILITY = 0.05d;
    private static final int MIN_BYTES_BETWEEN_COMPACTIONS = 524288;
    private final Object decisionLock = new Object();
    private final HashHelper hashHelper;
    private final String readerId;
    private final StateSynchronizer<ReaderGroupState> sync;
    private final Controller controller;
    private final TimeoutTimer releaseTimer;
    private final TimeoutTimer acquireTimer;
    private final TimeoutTimer fetchStateTimer;
    private final TimeoutTimer checkpointTimer;
    private final TimeoutTimer lagUpdateTimer;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReaderGroupStateManager(String str, StateSynchronizer<ReaderGroupState> stateSynchronizer, Controller controller, Supplier<Long> supplier) {
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(stateSynchronizer);
        Preconditions.checkNotNull(controller);
        this.readerId = str;
        this.hashHelper = HashHelper.seededWith(str);
        this.sync = stateSynchronizer;
        this.controller = controller;
        supplier = supplier == null ? System::nanoTime : supplier;
        this.releaseTimer = new TimeoutTimer(TIME_UNIT, supplier);
        this.acquireTimer = new TimeoutTimer(Duration.ZERO, supplier);
        this.fetchStateTimer = new TimeoutTimer(Duration.ZERO, supplier);
        this.checkpointTimer = new TimeoutTimer(TIME_UNIT, supplier);
        this.lagUpdateTimer = new TimeoutTimer(TIME_UNIT, supplier);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initializeReader(long j) {
        if (((Boolean) this.sync.updateState((readerGroupState, list) -> {
            if (readerGroupState.getSegments(this.readerId) != null) {
                return true;
            }
            log.debug("Adding reader {} to reader group. CurrentState is: {}", this.readerId, readerGroupState);
            list.add(new ReaderGroupState.AddReader(this.readerId));
            return false;
        })).booleanValue()) {
            throw new IllegalStateException("The requested reader: " + this.readerId + " cannot be added to the group because it is already in the group. Perhaps close() was not called?");
        }
        this.acquireTimer.reset(Duration.ofMillis(j + ((long) (Math.random() * Math.min(j, this.sync.getState().getConfig().getGroupRefreshTimeMillis())))));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void readerShutdown(Position position) {
        readerShutdown(this.readerId, position, this.sync);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void readerShutdown(String str, Position position, StateSynchronizer<ReaderGroupState> stateSynchronizer) {
        stateSynchronizer.fetchUpdates();
        stateSynchronizer.updateState((readerGroupState, list) -> {
            if (readerGroupState.getSegments(str) == null) {
                return;
            }
            log.debug("Removing reader {} from reader group. CurrentState is: {}. Position is: {}.", new Object[]{str, readerGroupState, position});
            list.add(new ReaderGroupState.RemoveReader(str, position == null ? Collections.emptyMap() : position.asImpl().getOwnedSegmentsWithOffsets()));
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        this.sync.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean handleEndOfSegment(SegmentWithRange segmentWithRange) throws ReaderNotInReaderGroupException {
        Map<SegmentWithRange, List<Long>> emptyMap = this.sync.getState().getEndSegments().containsKey(segmentWithRange.getSegment()) ? Collections.emptyMap() : ((StreamSegmentsWithPredecessors) Futures.getAndHandleExceptions(this.controller.getSuccessors(segmentWithRange.getSegment()), RuntimeException::new)).getSegmentToPredecessor();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Map<SegmentWithRange, List<Long>> map = emptyMap;
        boolean booleanValue = ((Boolean) this.sync.updateState((readerGroupState, list) -> {
            if (readerGroupState.isReaderOnline(this.readerId)) {
                log.debug("Marking segment {} as completed in reader group. CurrentState is: {}", segmentWithRange, readerGroupState);
                atomicBoolean.set(false);
                if (readerGroupState.getCheckpointForReader(this.readerId) == null) {
                    list.add(new ReaderGroupState.SegmentCompleted(this.readerId, segmentWithRange, map));
                    return true;
                }
            } else {
                atomicBoolean.set(true);
            }
            return false;
        })).booleanValue();
        if (atomicBoolean.get()) {
            throw new ReaderNotInReaderGroupException(this.readerId);
        }
        this.acquireTimer.zero();
        return booleanValue;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Segment findSegmentToReleaseIfRequired() {
        fetchUpdatesIfNeeded();
        Segment segment = null;
        synchronized (this.decisionLock) {
            if (!this.releaseTimer.hasRemaining() && this.sync.getState().getCheckpointForReader(this.readerId) == null && doesReaderOwnTooManySegments(this.sync.getState())) {
                segment = findSegmentToRelease();
                if (segment != null) {
                    this.releaseTimer.reset(UPDATE_WINDOW);
                    this.acquireTimer.reset(UPDATE_WINDOW);
                }
            }
        }
        return segment;
    }

    private boolean doesReaderOwnTooManySegments(ReaderGroupState readerGroupState) {
        Map<String, Double> relativeSizes = readerGroupState.getRelativeSizes();
        Set<Segment> segments = readerGroupState.getSegments(this.readerId);
        if (relativeSizes.isEmpty() || segments == null || segments.size() <= 1) {
            return false;
        }
        return relativeSizes.get(this.readerId).doubleValue() > relativeSizes.values().stream().min((v0, v1) -> {
            return v0.compareTo(v1);
        }).get().doubleValue() + ((double) Math.max(1, readerGroupState.getNumberOfUnassignedSegments()));
    }

    private Segment findSegmentToRelease() {
        return this.sync.getState().getSegments(this.readerId).stream().max((segment, segment2) -> {
            return Double.compare(this.hashHelper.hashToRange(segment.getScopedName()), this.hashHelper.hashToRange(segment2.getScopedName()));
        }).orElse(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getEndOffsetForSegment(Segment segment) {
        return ((Long) Optional.ofNullable(this.sync.getState().getEndSegments().get(segment)).orElse(Long.MAX_VALUE)).longValue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean releaseSegment(Segment segment, long j, long j2, Position position) throws ReaderNotInReaderGroupException {
        this.sync.updateState((readerGroupState, list) -> {
            Set<Segment> segments = readerGroupState.getSegments(this.readerId);
            if (segments != null && segments.contains(segment) && readerGroupState.getCheckpointForReader(this.readerId) == null && doesReaderOwnTooManySegments(readerGroupState)) {
                list.add(new ReaderGroupState.ReleaseSegment(this.readerId, segment, j));
                list.add(new ReaderGroupState.UpdateDistanceToTail(this.readerId, j2, position.asImpl().getOwnedSegmentRangesWithOffsets()));
            }
        });
        ReaderGroupState state = this.sync.getState();
        this.releaseTimer.reset(calculateReleaseTime(this.readerId, state));
        this.acquireTimer.reset(calculateAcquireTime(this.readerId, state));
        resetLagUpdateTimer();
        if (state.isReaderOnline(this.readerId)) {
            return !state.getSegments(this.readerId).contains(segment);
        }
        throw new ReaderNotInReaderGroupException(this.readerId);
    }

    @VisibleForTesting
    static Duration calculateReleaseTime(String str, ReaderGroupState readerGroupState) {
        return TIME_UNIT.multipliedBy(1 + readerGroupState.getRanking(str));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<SegmentWithRange, Long> acquireNewSegmentsIfNeeded(long j, Position position) throws ReaderNotInReaderGroupException {
        fetchUpdatesIfNeeded();
        return shouldAcquireSegment() ? acquireSegment(j, position) : Collections.emptyMap();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean updateLagIfNeeded(long j, Position position) {
        if (this.lagUpdateTimer.hasRemaining()) {
            return false;
        }
        log.debug("Update lag for reader {}", this.readerId);
        resetLagUpdateTimer();
        this.sync.updateStateUnconditionally(new ReaderGroupState.UpdateDistanceToTail(this.readerId, j, position.asImpl().getOwnedSegmentRangesWithOffsets()));
        resetFetchUpdateTimer();
        this.sync.fetchUpdates();
        return true;
    }

    private void resetFetchUpdateTimer() {
        this.fetchStateTimer.reset(Duration.ofMillis(this.sync.getState().getConfig().getGroupRefreshTimeMillis()));
    }

    private void resetLagUpdateTimer() {
        this.lagUpdateTimer.reset(TIME_UNIT.multipliedBy(this.sync.getState().getOnlineReaders().size() + 1));
    }

    private void fetchUpdatesIfNeeded() {
        if (this.fetchStateTimer.hasRemaining()) {
            return;
        }
        log.debug("Update group state for reader {}", this.readerId);
        this.sync.fetchUpdates();
        resetFetchUpdateTimer();
        compactIfNeeded();
    }

    private void compactIfNeeded() {
        if (this.sync.bytesWrittenSinceCompaction() <= 524288 || Math.random() >= COMPACTION_PROBABILITY) {
            return;
        }
        log.debug("Compacting reader group state {}", this.sync.getState());
        this.sync.compact(readerGroupState -> {
            return new ReaderGroupState.CompactReaderGroupState(readerGroupState);
        });
    }

    private boolean shouldAcquireSegment() throws ReaderNotInReaderGroupException {
        synchronized (this.decisionLock) {
            ReaderGroupState state = this.sync.getState();
            if (!state.isReaderOnline(this.readerId)) {
                throw new ReaderNotInReaderGroupException(this.readerId);
            }
            if (this.acquireTimer.hasRemaining()) {
                return false;
            }
            if (state.getCheckpointForReader(this.readerId) != null) {
                return false;
            }
            if (state.getNumberOfUnassignedSegments() == 0) {
                if (doesReaderOwnTooManySegments(state)) {
                    this.acquireTimer.reset(calculateAcquireTime(this.readerId, state));
                }
                return false;
            }
            this.acquireTimer.reset(UPDATE_WINDOW);
            this.releaseTimer.reset(UPDATE_WINDOW);
            return true;
        }
    }

    private Map<SegmentWithRange, Long> acquireSegment(long j, Position position) throws ReaderNotInReaderGroupException {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Map<SegmentWithRange, Long> map = (Map) this.sync.updateState((readerGroupState, list) -> {
            int calculateNumSegmentsToAcquire;
            if (!readerGroupState.isReaderOnline(this.readerId)) {
                atomicBoolean.set(true);
                return Collections.emptyMap();
            }
            atomicBoolean.set(false);
            if (readerGroupState.getCheckpointForReader(this.readerId) == null && (calculateNumSegmentsToAcquire = calculateNumSegmentsToAcquire(readerGroupState)) != 0) {
                Map<SegmentWithRange, Long> unassignedSegments = readerGroupState.getUnassignedSegments();
                HashMap hashMap = new HashMap(calculateNumSegmentsToAcquire);
                Iterator<Map.Entry<SegmentWithRange, Long>> it = unassignedSegments.entrySet().iterator();
                for (int i = 0; i < calculateNumSegmentsToAcquire; i++) {
                    if (!$assertionsDisabled && !it.hasNext()) {
                        throw new AssertionError();
                    }
                    Map.Entry<SegmentWithRange, Long> next = it.next();
                    hashMap.put(next.getKey(), next.getValue());
                    list.add(new ReaderGroupState.AcquireSegment(this.readerId, next.getKey().getSegment()));
                }
                list.add(new ReaderGroupState.UpdateDistanceToTail(this.readerId, j, position.asImpl().getOwnedSegmentRangesWithOffsets()));
                return hashMap;
            }
            return Collections.emptyMap();
        });
        if (atomicBoolean.get()) {
            throw new ReaderNotInReaderGroupException(this.readerId);
        }
        this.releaseTimer.reset(calculateReleaseTime(this.readerId, this.sync.getState()));
        this.acquireTimer.reset(calculateAcquireTime(this.readerId, this.sync.getState()));
        resetLagUpdateTimer();
        return map;
    }

    private int calculateNumSegmentsToAcquire(ReaderGroupState readerGroupState) {
        int numberOfUnassignedSegments = readerGroupState.getNumberOfUnassignedSegments();
        if (numberOfUnassignedSegments == 0) {
            return 0;
        }
        int numberOfSegments = readerGroupState.getNumberOfSegments();
        int size = readerGroupState.getSegments(this.readerId).size();
        int numberOfReaders = readerGroupState.getNumberOfReaders();
        return Math.max(Math.max(numberOfUnassignedSegments / numberOfReaders, Math.min(numberOfUnassignedSegments, Math.round(numberOfSegments / numberOfReaders) - size)), 1);
    }

    @VisibleForTesting
    static Duration calculateAcquireTime(String str, ReaderGroupState readerGroupState) {
        return TIME_UNIT.multipliedBy(readerGroupState.getNumberOfReaders() - readerGroupState.getRanking(str));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getCheckpoint() throws ReaderNotInReaderGroupException {
        fetchUpdatesIfNeeded();
        ReaderGroupState state = this.sync.getState();
        long automaticCheckpointIntervalMillis = state.getConfig().getAutomaticCheckpointIntervalMillis();
        if (!state.isReaderOnline(this.readerId)) {
            throw new ReaderNotInReaderGroupException(this.readerId);
        }
        String checkpointForReader = state.getCheckpointForReader(this.readerId);
        if (checkpointForReader != null) {
            this.checkpointTimer.reset(Duration.ofMillis(automaticCheckpointIntervalMillis));
            return checkpointForReader;
        }
        if (automaticCheckpointIntervalMillis <= 0 || this.checkpointTimer.hasRemaining() || state.hasOngoingCheckpoint()) {
            return null;
        }
        this.sync.updateState((readerGroupState, list) -> {
            if (readerGroupState.hasOngoingCheckpoint()) {
                return;
            }
            ReaderGroupState.CreateCheckpoint createCheckpoint = new ReaderGroupState.CreateCheckpoint();
            list.add(createCheckpoint);
            list.add(new ReaderGroupState.ClearCheckpointsBefore(createCheckpoint.getCheckpointId()));
            log.debug("Created new checkpoint: {} currentState is: {}", createCheckpoint, readerGroupState);
        });
        this.checkpointTimer.reset(Duration.ofMillis(automaticCheckpointIntervalMillis));
        return state.getCheckpointForReader(this.readerId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void checkpoint(String str, PositionInternal positionInternal) throws ReaderNotInReaderGroupException {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.sync.updateState((readerGroupState, list) -> {
            if (!readerGroupState.isReaderOnline(this.readerId)) {
                atomicBoolean.set(true);
            } else {
                atomicBoolean.set(false);
                list.add(new ReaderGroupState.CheckpointReader(str, this.readerId, positionInternal.getOwnedSegmentsWithOffsets()));
            }
        });
        if (atomicBoolean.get()) {
            throw new ReaderNotInReaderGroupException(this.readerId);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isCheckpointSilent(String str) {
        return this.sync.getState().getCheckpointState().isCheckpointSilent(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<Stream> getStreams() {
        return Collections.unmodifiableSet(this.sync.getState().getConfig().getStartingStreamCuts().keySet());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<SegmentWithRange, Long> getLastReadpositions(Stream stream) {
        return this.sync.getState().getLastReadPositions(stream);
    }

    public String getOrRefreshDelegationTokenFor(Segment segment) {
        return (String) Futures.getAndHandleExceptions(this.controller.getOrRefreshDelegationTokenFor(segment.getScope(), segment.getStreamName()), RuntimeException::new);
    }

    @SuppressFBWarnings(justification = "generated code")
    public String getReaderId() {
        return this.readerId;
    }

    static {
        $assertionsDisabled = !ReaderGroupStateManager.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(ReaderGroupStateManager.class);
        TIME_UNIT = Duration.ofMillis(1000L);
        UPDATE_WINDOW = Duration.ofMillis(30000L);
    }
}
