package io.pravega.client.stream.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.state.StateSynchronizer;
import io.pravega.client.stream.Position;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.ReinitializationRequiredException;
import io.pravega.client.stream.impl.ReaderGroupState;
import io.pravega.common.TimeoutTimer;
import io.pravega.common.concurrent.FutureHelpers;
import io.pravega.common.hash.HashHelper;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shaded.org.apache.commons.lang.math.RandomUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/* loaded from: input_file:io/pravega/client/stream/impl/ReaderGroupStateManager.class */
public class ReaderGroupStateManager {
    static final Duration TIME_UNIT;
    static final Duration UPDATE_WINDOW;
    private final Object decisionLock = new Object();
    private final HashHelper hashHelper;
    private final String readerId;
    private final StateSynchronizer<ReaderGroupState> sync;
    private final Controller controller;
    private final TimeoutTimer releaseTimer;
    private final TimeoutTimer acquireTimer;
    private final TimeoutTimer fetchStateTimer;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReaderGroupStateManager(String str, StateSynchronizer<ReaderGroupState> stateSynchronizer, Controller controller, Supplier<Long> supplier) {
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(stateSynchronizer);
        Preconditions.checkNotNull(controller);
        this.readerId = str;
        this.hashHelper = HashHelper.seededWith(str);
        this.sync = stateSynchronizer;
        this.controller = controller;
        supplier = supplier == null ? System::nanoTime : supplier;
        this.releaseTimer = new TimeoutTimer(TIME_UNIT, supplier);
        this.acquireTimer = new TimeoutTimer(TIME_UNIT, supplier);
        this.fetchStateTimer = new TimeoutTimer(TIME_UNIT, supplier);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void initializeReaderGroup(StateSynchronizer<ReaderGroupState> stateSynchronizer, ReaderGroupConfig readerGroupConfig, Map<Segment, Long> map) {
        stateSynchronizer.initialize(new ReaderGroupState.ReaderGroupStateInit(readerGroupConfig, map));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initializeReader(long j) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.sync.updateState(readerGroupState -> {
            if (readerGroupState.getSegments(this.readerId) == null) {
                return Collections.singletonList(new ReaderGroupState.AddReader(this.readerId));
            }
            atomicBoolean.set(true);
            return null;
        });
        if (atomicBoolean.get()) {
            throw new IllegalStateException("The requested reader: " + this.readerId + " cannot be added to the group because it is already in the group. Perhaps close() was not called?");
        }
        this.acquireTimer.reset(Duration.ofMillis(j + (RandomUtils.nextFloat() * ((float) Math.min(j, this.sync.getState().getConfig().getGroupRefreshTimeMillis())))));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void readerShutdown(Position position) {
        readerShutdown(this.readerId, position, this.sync);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void readerShutdown(String str, Position position, StateSynchronizer<ReaderGroupState> stateSynchronizer) {
        stateSynchronizer.updateState(readerGroupState -> {
            Set<Segment> segments = readerGroupState.getSegments(str);
            if (segments == null) {
                return null;
            }
            if (position == null || position.asImpl().getOwnedSegments().containsAll(segments)) {
                return Collections.singletonList(new ReaderGroupState.RemoveReader(str, position == null ? null : position.asImpl()));
            }
            throw new IllegalArgumentException("When shutting down a reader: Given position does not match the segments it was assigned: \n" + segments + " \n vs \n " + position.asImpl().getOwnedSegments());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        this.sync.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleEndOfSegment(Segment segment) throws ReinitializationRequiredException {
        StreamSegmentsWithPredecessors streamSegmentsWithPredecessors = (StreamSegmentsWithPredecessors) FutureHelpers.getAndHandleExceptions(this.controller.getSuccessors(segment), RuntimeException::new);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.sync.updateState(readerGroupState -> {
            if (readerGroupState.isReaderOnline(this.readerId)) {
                return Collections.singletonList(new ReaderGroupState.SegmentCompleted(this.readerId, segment, streamSegmentsWithPredecessors.getSegmentToPredecessor()));
            }
            atomicBoolean.set(true);
            return null;
        });
        if (atomicBoolean.get()) {
            throw new ReinitializationRequiredException();
        }
        this.acquireTimer.zero();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Segment findSegmentToReleaseIfRequired() {
        fetchUpdatesIfNeeded();
        Segment segment = null;
        synchronized (this.decisionLock) {
            if (!this.releaseTimer.hasRemaining() && this.sync.getState().getCheckpointForReader(this.readerId) == null && doesReaderOwnTooManySegments(this.sync.getState())) {
                segment = findSegmentToRelease();
                if (segment != null) {
                    this.releaseTimer.reset(UPDATE_WINDOW);
                }
            }
        }
        return segment;
    }

    private boolean doesReaderOwnTooManySegments(ReaderGroupState readerGroupState) {
        Map<String, Double> relativeSizes = readerGroupState.getRelativeSizes();
        Set<Segment> segments = readerGroupState.getSegments(this.readerId);
        if (relativeSizes.isEmpty() || segments == null || segments.size() <= 1) {
            return false;
        }
        return relativeSizes.get(this.readerId).doubleValue() > relativeSizes.values().stream().min((v0, v1) -> {
            return v0.compareTo(v1);
        }).get().doubleValue() + ((double) Math.max(1, readerGroupState.getNumberOfUnassignedSegments()));
    }

    private Segment findSegmentToRelease() {
        return this.sync.getState().getSegments(this.readerId).stream().max((segment, segment2) -> {
            return Double.compare(this.hashHelper.hashToRange(segment.getScopedName()), this.hashHelper.hashToRange(segment2.getScopedName()));
        }).orElse(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean releaseSegment(Segment segment, long j, long j2) throws ReinitializationRequiredException {
        this.sync.updateState(readerGroupState -> {
            Set<Segment> segments = readerGroupState.getSegments(this.readerId);
            if (segments == null || !segments.contains(segment) || readerGroupState.getCheckpointForReader(this.readerId) != null || !doesReaderOwnTooManySegments(readerGroupState)) {
                return null;
            }
            ArrayList arrayList = new ArrayList(2);
            arrayList.add(new ReaderGroupState.ReleaseSegment(this.readerId, segment, j));
            arrayList.add(new ReaderGroupState.UpdateDistanceToTail(this.readerId, j2));
            return arrayList;
        });
        ReaderGroupState state = this.sync.getState();
        this.releaseTimer.reset(calculateReleaseTime(state));
        if (state.isReaderOnline(this.readerId)) {
            return !state.getSegments(this.readerId).contains(segment);
        }
        throw new ReinitializationRequiredException();
    }

    private Duration calculateReleaseTime(ReaderGroupState readerGroupState) {
        return TIME_UNIT.multipliedBy(1 + readerGroupState.getRanking(this.readerId));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<Segment, Long> acquireNewSegmentsIfNeeded(long j) throws ReinitializationRequiredException {
        fetchUpdatesIfNeeded();
        return shouldAcquireSegment() ? acquireSegment(j) : Collections.emptyMap();
    }

    private void fetchUpdatesIfNeeded() {
        if (this.fetchStateTimer.hasRemaining()) {
            return;
        }
        this.sync.fetchUpdates();
        this.fetchStateTimer.reset(Duration.ofMillis(this.sync.getState().getConfig().getGroupRefreshTimeMillis()));
    }

    private boolean shouldAcquireSegment() throws ReinitializationRequiredException {
        synchronized (this.decisionLock) {
            if (!this.sync.getState().isReaderOnline(this.readerId)) {
                throw new ReinitializationRequiredException();
            }
            if (this.acquireTimer.hasRemaining()) {
                return false;
            }
            if (this.sync.getState().getNumberOfUnassignedSegments() == 0) {
                return false;
            }
            if (this.sync.getState().getCheckpointForReader(this.readerId) != null) {
                return false;
            }
            this.acquireTimer.reset(UPDATE_WINDOW);
            return true;
        }
    }

    private Map<Segment, Long> acquireSegment(long j) throws ReinitializationRequiredException {
        AtomicReference atomicReference = new AtomicReference();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.sync.updateState(readerGroupState -> {
            if (!readerGroupState.isReaderOnline(this.readerId)) {
                atomicBoolean.set(true);
                return null;
            }
            if (readerGroupState.getCheckpointForReader(this.readerId) != null) {
                return null;
            }
            int calculateNumSegmentsToAcquire = calculateNumSegmentsToAcquire(readerGroupState);
            if (calculateNumSegmentsToAcquire == 0) {
                atomicReference.set(Collections.emptyMap());
                return null;
            }
            Map<Segment, Long> unassignedSegments = readerGroupState.getUnassignedSegments();
            HashMap hashMap = new HashMap(calculateNumSegmentsToAcquire);
            ArrayList arrayList = new ArrayList(calculateNumSegmentsToAcquire);
            Iterator<Map.Entry<Segment, Long>> it = unassignedSegments.entrySet().iterator();
            for (int i = 0; i < calculateNumSegmentsToAcquire; i++) {
                if (!$assertionsDisabled && !it.hasNext()) {
                    throw new AssertionError();
                }
                Map.Entry<Segment, Long> next = it.next();
                hashMap.put(next.getKey(), next.getValue());
                arrayList.add(new ReaderGroupState.AcquireSegment(this.readerId, next.getKey()));
            }
            arrayList.add(new ReaderGroupState.UpdateDistanceToTail(this.readerId, j));
            atomicReference.set(hashMap);
            return arrayList;
        });
        if (atomicBoolean.get()) {
            throw new ReinitializationRequiredException();
        }
        this.acquireTimer.reset(calculateAcquireTime(this.sync.getState()));
        return (Map) atomicReference.get();
    }

    private int calculateNumSegmentsToAcquire(ReaderGroupState readerGroupState) {
        int numberOfUnassignedSegments = readerGroupState.getNumberOfUnassignedSegments();
        if (numberOfUnassignedSegments == 0) {
            return 0;
        }
        int numberOfSegments = readerGroupState.getNumberOfSegments();
        int size = readerGroupState.getSegments(this.readerId).size();
        int numberOfReaders = readerGroupState.getNumberOfReaders();
        return Math.max(Math.max(numberOfUnassignedSegments / numberOfReaders, Math.min(numberOfUnassignedSegments, Math.round(numberOfSegments / numberOfReaders) - size)), 1);
    }

    private Duration calculateAcquireTime(ReaderGroupState readerGroupState) {
        return TIME_UNIT.multipliedBy(readerGroupState.getNumberOfReaders() - readerGroupState.getRanking(this.readerId));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getCheckpoint() throws ReinitializationRequiredException {
        fetchUpdatesIfNeeded();
        ReaderGroupState state = this.sync.getState();
        if (state.isReaderOnline(this.readerId)) {
            return state.getCheckpointForReader(this.readerId);
        }
        throw new ReinitializationRequiredException();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void checkpoint(String str, PositionInternal positionInternal) throws ReinitializationRequiredException {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.sync.updateState(readerGroupState -> {
            if (readerGroupState.isReaderOnline(this.readerId)) {
                return Collections.singletonList(new ReaderGroupState.CheckpointReader(str, this.readerId, positionInternal.getOwnedSegmentsWithOffsets()));
            }
            atomicBoolean.set(true);
            return null;
        });
        if (atomicBoolean.get()) {
            throw new ReinitializationRequiredException();
        }
    }

    @SuppressFBWarnings(justification = "generated code")
    public String getReaderId() {
        return this.readerId;
    }

    static {
        $assertionsDisabled = !ReaderGroupStateManager.class.desiredAssertionStatus();
        TIME_UNIT = Duration.ofMillis(1000L);
        UPDATE_WINDOW = Duration.ofMillis(30000L);
    }
}
