/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.client.stream.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.SynchronizerClientFactory;
import io.pravega.client.connection.impl.ConnectionPool;
import io.pravega.client.control.impl.Controller;
import io.pravega.client.security.auth.DelegationTokenProvider;
import io.pravega.client.security.auth.DelegationTokenProviderFactory;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentMetadataClient;
import io.pravega.client.segment.impl.SegmentMetadataClientFactory;
import io.pravega.client.segment.impl.SegmentMetadataClientFactoryImpl;
import io.pravega.client.state.InitialUpdate;
import io.pravega.client.state.StateSynchronizer;
import io.pravega.client.state.SynchronizerConfig;
import io.pravega.client.state.Update;
import io.pravega.client.stream.Checkpoint;
import io.pravega.client.stream.InvalidStreamException;
import io.pravega.client.stream.Position;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.ReaderGroupMetrics;
import io.pravega.client.stream.ReaderSegmentDistribution;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.impl.CheckpointFailedException;
import io.pravega.client.stream.impl.CheckpointImpl;
import io.pravega.client.stream.impl.CheckpointState;
import io.pravega.client.stream.impl.MaxNumberOfCheckpointsExceededException;
import io.pravega.client.stream.impl.ReaderGroupState;
import io.pravega.client.stream.impl.ReaderGroupStateManager;
import io.pravega.client.stream.impl.SegmentWithRange;
import io.pravega.client.stream.impl.StreamCutImpl;
import io.pravega.client.stream.impl.StreamSegmentSuccessors;
import io.pravega.client.stream.notifications.EndOfDataNotification;
import io.pravega.client.stream.notifications.NotificationSystem;
import io.pravega.client.stream.notifications.NotifierFactory;
import io.pravega.client.stream.notifications.Observable;
import io.pravega.client.stream.notifications.SegmentNotification;
import io.pravega.common.concurrent.Futures;
import io.pravega.shared.NameUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReaderGroupImpl
implements ReaderGroup,
ReaderGroupMetrics {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ReaderGroupImpl.class);
    static final String SILENT = "_SILENT_";
    private final String scope;
    private final String groupName;
    private final Controller controller;
    private final SegmentMetadataClientFactory metaFactory;
    private final StateSynchronizer<ReaderGroupState> synchronizer;
    private final NotifierFactory notifierFactory;

    public ReaderGroupImpl(String scope, String groupName, SynchronizerConfig synchronizerConfig, Serializer<InitialUpdate<ReaderGroupState>> initSerializer, Serializer<Update<ReaderGroupState>> updateSerializer, SynchronizerClientFactory clientFactory, Controller controller, ConnectionPool connectionPool) {
        Preconditions.checkNotNull(synchronizerConfig);
        Preconditions.checkNotNull(initSerializer);
        Preconditions.checkNotNull(updateSerializer);
        Preconditions.checkNotNull(clientFactory);
        Preconditions.checkNotNull(connectionPool);
        this.scope = Preconditions.checkNotNull(scope);
        this.groupName = Preconditions.checkNotNull(groupName);
        this.controller = Preconditions.checkNotNull(controller);
        this.metaFactory = new SegmentMetadataClientFactoryImpl(controller, connectionPool);
        this.synchronizer = clientFactory.createStateSynchronizer(NameUtils.getStreamForReaderGroup(groupName), updateSerializer, initSerializer, synchronizerConfig);
        this.notifierFactory = new NotifierFactory(new NotificationSystem(), this.synchronizer);
    }

    @Override
    public void readerOffline(String readerId, Position lastPosition) {
        ReaderGroupStateManager.readerShutdown(readerId, lastPosition, this.synchronizer);
    }

    @Override
    public Set<String> getOnlineReaders() {
        this.synchronizer.fetchUpdates();
        return this.synchronizer.getState().getOnlineReaders();
    }

    @Override
    public Set<String> getStreamNames() {
        this.synchronizer.fetchUpdates();
        return this.synchronizer.getState().getStreamNames();
    }

    @Override
    public CompletableFuture<Checkpoint> initiateCheckpoint(String checkpointName, ScheduledExecutorService backgroundExecutor) {
        String rejectMessage = "rejecting checkpoint request since pending checkpoint reaches max allowed limit";
        boolean canPerformCheckpoint = (Boolean)this.synchronizer.updateState((state, updates) -> {
            ReaderGroupConfig config = state.getConfig();
            CheckpointState checkpointState = state.getCheckpointState();
            int maxOutstandingCheckpointRequest = config.getMaxOutstandingCheckpointRequest();
            List<String> outstandingCheckpoints = checkpointState.getOutstandingCheckpoints();
            int currentOutstandingCheckpointRequest = outstandingCheckpoints.size();
            if (currentOutstandingCheckpointRequest >= maxOutstandingCheckpointRequest) {
                log.warn("Current outstanding checkpoints are : {}, maxOutstandingCheckpointRequest: {}, currentOutstandingCheckpointRequest: {}, errorMessage: {} {}", outstandingCheckpoints, maxOutstandingCheckpointRequest, currentOutstandingCheckpointRequest, rejectMessage, maxOutstandingCheckpointRequest);
                return false;
            }
            updates.add(new ReaderGroupState.CreateCheckpoint(checkpointName));
            return true;
        });
        if (!canPerformCheckpoint) {
            return Futures.failedFuture(new MaxNumberOfCheckpointsExceededException(rejectMessage));
        }
        return ((CompletableFuture)this.waitForCheckpointComplete(checkpointName, backgroundExecutor).thenApply(v -> this.completeCheckpoint(checkpointName))).thenApply(checkpoint -> checkpoint);
    }

    private CompletableFuture<Void> waitForCheckpointComplete(String checkpointName, ScheduledExecutorService backgroundExecutor) {
        AtomicBoolean checkpointPending = new AtomicBoolean(true);
        return Futures.loop(checkpointPending::get, () -> Futures.delayedTask(() -> {
            this.synchronizer.fetchUpdates();
            checkpointPending.set(!this.synchronizer.getState().isCheckpointComplete(checkpointName));
            if (checkpointPending.get()) {
                log.debug("Waiting on checkpoint: {} currentState is: {}", (Object)checkpointName, (Object)this.synchronizer.getState());
            }
            return null;
        }, Duration.ofMillis(500L), backgroundExecutor), (Executor)backgroundExecutor);
    }

    private Checkpoint completeCheckpoint(String checkpointName) {
        ReaderGroupState state = this.synchronizer.getState();
        Map<Segment, Long> map = state.getPositionsForCompletedCheckpoint(checkpointName);
        this.synchronizer.updateStateUnconditionally(new ReaderGroupState.ClearCheckpointsBefore(checkpointName));
        if (map == null) {
            throw new CheckpointFailedException("Checkpoint was cleared before results could be read.");
        }
        return new CheckpointImpl(checkpointName, map);
    }

    @Override
    public void resetReaderGroup(ReaderGroupConfig config) {
        Map<SegmentWithRange, Long> segments = ReaderGroupImpl.getSegmentsForStreams(this.controller, config);
        this.synchronizer.updateStateUnconditionally(new ReaderGroupState.ReaderGroupStateInit(config, segments, ReaderGroupImpl.getEndSegmentsForStreams(config)));
    }

    @Override
    public ReaderSegmentDistribution getReaderSegmentDistribution() {
        this.synchronizer.fetchUpdates();
        ReaderGroupState state = this.synchronizer.getState();
        ImmutableMap.Builder mapBuilder = ImmutableMap.builder();
        state.getOnlineReaders().forEach(reader -> {
            Map<SegmentWithRange, Long> assigned = state.getAssignedSegments((String)reader);
            int size = assigned != null ? assigned.size() : 0;
            mapBuilder.put(reader, size);
        });
        int unassigned = state.getNumberOfUnassignedSegments();
        ImmutableMap<String, Integer> readerDistribution = mapBuilder.build();
        log.info("ReaderGroup {} has unassigned segments count = {} and segment distribution as {}", this.getGroupName(), unassigned, readerDistribution);
        return ReaderSegmentDistribution.builder().readerSegmentDistribution(readerDistribution).unassignedSegments(unassigned).build();
    }

    @VisibleForTesting
    public static Map<SegmentWithRange, Long> getSegmentsForStreams(Controller controller, ReaderGroupConfig config) {
        Map<Stream, StreamCut> streamToStreamCuts = config.getStartingStreamCuts();
        ArrayList futures = new ArrayList(streamToStreamCuts.size());
        streamToStreamCuts.entrySet().forEach(e -> {
            if (((StreamCut)e.getValue()).equals(StreamCut.UNBOUNDED)) {
                futures.add(controller.getSegmentsAtTime((Stream)e.getKey(), 0L));
            } else {
                futures.add(CompletableFuture.completedFuture(((StreamCut)e.getValue()).asImpl().getPositions()));
            }
        });
        return (Map)Futures.getAndHandleExceptions(Futures.allOfWithResults(futures).thenApply(listOfMaps -> listOfMaps.stream().flatMap(map -> map.entrySet().stream()).collect(Collectors.toMap(e -> new SegmentWithRange((Segment)e.getKey(), null), e -> (Long)e.getValue()))), InvalidStreamException::new);
    }

    public static Map<Segment, Long> getEndSegmentsForStreams(ReaderGroupConfig config) {
        List listOfMaps = config.getEndingStreamCuts().entrySet().stream().filter(e -> !((StreamCut)e.getValue()).equals(StreamCut.UNBOUNDED)).map(e -> ((StreamCut)e.getValue()).asImpl().getPositions()).collect(Collectors.toList());
        return listOfMaps.stream().flatMap(map -> map.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, entry -> (Long)entry.getValue() == -1L ? Long.valueOf(Long.MAX_VALUE) : (Long)entry.getValue()));
    }

    @Override
    public ReaderGroupMetrics getMetrics() {
        return this;
    }

    @Override
    public long unreadBytes() {
        this.synchronizer.fetchUpdates();
        Optional<Map<Stream, Map<Segment, Long>>> checkPointedPositions = this.synchronizer.getState().getPositionsForLastCompletedCheckpoint();
        if (checkPointedPositions.isPresent()) {
            log.debug("Computing unread bytes based on the last checkPoint position");
            return this.getUnreadBytes(checkPointedPositions.get(), this.synchronizer.getState().getEndSegments(), this.metaFactory);
        }
        log.info("No checkpoints found, using the last known offset to compute unread bytes");
        return this.getUnreadBytesIgnoringRange(this.synchronizer.getState().getPositions(), this.synchronizer.getState().getEndSegments(), this.metaFactory);
    }

    private long getUnreadBytes(Map<Stream, Map<Segment, Long>> positions, Map<Segment, Long> endSegments, SegmentMetadataClientFactory metaFactory) {
        log.debug("Compute unread bytes from position {}", (Object)positions);
        ArrayList futures = new ArrayList(positions.size());
        for (Map.Entry<Stream, Map<Segment, Long>> streamPosition : positions.entrySet()) {
            StreamCutImpl fromStreamCut = new StreamCutImpl(streamPosition.getKey(), streamPosition.getValue());
            StreamCut toStreamCut = this.computeEndStreamCut(streamPosition.getKey(), endSegments);
            futures.add(this.getRemainingBytes(metaFactory, fromStreamCut, toStreamCut));
        }
        return (Long)Futures.getAndHandleExceptions(Futures.allOfWithResults(futures).thenApply(listOfLong -> listOfLong.stream().mapToLong(i -> i).sum()), RuntimeException::new);
    }

    private long getUnreadBytesIgnoringRange(Map<Stream, Map<SegmentWithRange, Long>> positions, Map<Segment, Long> endSegments, SegmentMetadataClientFactory metaFactory) {
        log.debug("Compute unread bytes from position {}", (Object)positions);
        long totalLength = 0L;
        for (Map.Entry<Stream, Map<SegmentWithRange, Long>> streamPosition : positions.entrySet()) {
            StreamCutImpl fromStreamCut = new StreamCutImpl(streamPosition.getKey(), this.dropRange(streamPosition.getValue()));
            StreamCut toStreamCut = this.computeEndStreamCut(streamPosition.getKey(), endSegments);
            totalLength += Futures.getAndHandleExceptions(this.getRemainingBytes(metaFactory, fromStreamCut, toStreamCut), RuntimeException::new).longValue();
        }
        return totalLength;
    }

    private Map<Segment, Long> dropRange(Map<SegmentWithRange, Long> in) {
        return in.entrySet().stream().collect(Collectors.toMap(e -> ((SegmentWithRange)e.getKey()).getSegment(), e -> (Long)e.getValue()));
    }

    private StreamCut computeEndStreamCut(Stream stream, Map<Segment, Long> endSegments) {
        Map<Segment, Long> toPositions = endSegments.entrySet().stream().filter(e -> ((Segment)e.getKey()).getStream().equals(stream)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        return toPositions.isEmpty() ? StreamCut.UNBOUNDED : new StreamCutImpl(stream, toPositions);
    }

    private CompletableFuture<Long> getRemainingBytes(SegmentMetadataClientFactory metaFactory, StreamCut fromStreamCut, StreamCut toStreamCut) {
        Map<Object, Object> endPositions;
        CompletableFuture<StreamSegmentSuccessors> unread;
        if (toStreamCut.equals(StreamCut.UNBOUNDED)) {
            unread = this.controller.getSuccessors(fromStreamCut);
            endPositions = Collections.emptyMap();
        } else {
            unread = this.controller.getSegments(fromStreamCut, toStreamCut);
            endPositions = toStreamCut.asImpl().getPositions();
        }
        return unread.thenApply(unreadVal -> {
            long totalLength = 0L;
            DelegationTokenProvider tokenProvider = null;
            for (Segment s2 : unreadVal.getSegments()) {
                if (endPositions.containsKey(s2)) {
                    totalLength += ((Long)endPositions.get(s2)).longValue();
                    continue;
                }
                if (tokenProvider == null) {
                    tokenProvider = DelegationTokenProviderFactory.create(unreadVal.getDelegationToken(), this.controller, s2);
                }
                SegmentMetadataClient metadataClient = metaFactory.createSegmentMetadataClient(s2, tokenProvider);
                try {
                    totalLength += metadataClient.fetchCurrentSegmentLength();
                }
                finally {
                    if (Collections.singletonList(metadataClient).get(0) == null) continue;
                    metadataClient.close();
                }
            }
            Iterator<Comparable<Segment>> iterator = fromStreamCut.asImpl().getPositions().values().iterator();
            while (iterator.hasNext()) {
                long bytesRead = (Long)iterator.next();
                totalLength -= bytesRead;
            }
            log.debug("Remaining bytes from position: {} to position: {} is {}", fromStreamCut, toStreamCut, totalLength);
            return totalLength;
        });
    }

    @Override
    public Observable<SegmentNotification> getSegmentNotifier(ScheduledExecutorService executor) {
        Preconditions.checkNotNull(executor, "executor");
        return this.notifierFactory.getSegmentNotifier(executor);
    }

    @Override
    public Observable<EndOfDataNotification> getEndOfDataNotifier(ScheduledExecutorService executor) {
        Preconditions.checkNotNull(executor, "executor");
        return this.notifierFactory.getEndOfDataNotifier(executor);
    }

    @Override
    @VisibleForTesting
    public Map<Stream, StreamCut> getStreamCuts() {
        this.synchronizer.fetchUpdates();
        ReaderGroupState state = this.synchronizer.getState();
        Map<Stream, Map<SegmentWithRange, Long>> positions = state.getPositions();
        HashMap<Stream, StreamCut> cuts = new HashMap<Stream, StreamCut>();
        for (Map.Entry<Stream, Map<SegmentWithRange, Long>> streamPosition : positions.entrySet()) {
            StreamCutImpl position = new StreamCutImpl(streamPosition.getKey(), this.dropRange(streamPosition.getValue()));
            cuts.put(streamPosition.getKey(), position);
        }
        return cuts;
    }

    @Override
    public CompletableFuture<Map<Stream, StreamCut>> generateStreamCuts(ScheduledExecutorService backgroundExecutor) {
        String checkpointId = this.generateSilientCheckpointId();
        log.debug("Fetching the current StreamCut using id {}", (Object)checkpointId);
        this.synchronizer.updateStateUnconditionally(new ReaderGroupState.CreateCheckpoint(checkpointId));
        return this.waitForCheckpointComplete(checkpointId, backgroundExecutor).thenApply(v -> this.completeCheckpointAndFetchStreamCut(checkpointId));
    }

    private String generateSilientCheckpointId() {
        byte[] randomBytes = new byte[32];
        ThreadLocalRandom.current().nextBytes(randomBytes);
        return Base64.getEncoder().encodeToString(randomBytes) + SILENT;
    }

    private Map<Stream, StreamCut> completeCheckpointAndFetchStreamCut(String checkPointId) {
        ReaderGroupState state = this.synchronizer.getState();
        Optional<Map<Stream, StreamCut>> cuts = state.getStreamCutsForCompletedCheckpoint(checkPointId);
        this.synchronizer.updateStateUnconditionally(new ReaderGroupState.ClearCheckpointsBefore(checkPointId));
        return cuts.orElseThrow(() -> new CheckpointFailedException("Internal CheckPoint was cleared before results could be read."));
    }

    @Override
    public void close() {
        this.synchronizer.close();
    }

    @Override
    @SuppressFBWarnings(justification="generated code")
    @Generated
    public String getScope() {
        return this.scope;
    }

    @Override
    @SuppressFBWarnings(justification="generated code")
    @Generated
    public String getGroupName() {
        return this.groupName;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public Controller getController() {
        return this.controller;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public SegmentMetadataClientFactory getMetaFactory() {
        return this.metaFactory;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public StateSynchronizer<ReaderGroupState> getSynchronizer() {
        return this.synchronizer;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public NotifierFactory getNotifierFactory() {
        return this.notifierFactory;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public boolean equals(Object o) {
        if (o == this) {
            return true;
        }
        if (!(o instanceof ReaderGroupImpl)) {
            return false;
        }
        ReaderGroupImpl other = (ReaderGroupImpl)o;
        if (!other.canEqual(this)) {
            return false;
        }
        String this$scope = this.getScope();
        String other$scope = other.getScope();
        if (this$scope == null ? other$scope != null : !this$scope.equals(other$scope)) {
            return false;
        }
        String this$groupName = this.getGroupName();
        String other$groupName = other.getGroupName();
        if (this$groupName == null ? other$groupName != null : !this$groupName.equals(other$groupName)) {
            return false;
        }
        Controller this$controller = this.getController();
        Controller other$controller = other.getController();
        if (this$controller == null ? other$controller != null : !this$controller.equals(other$controller)) {
            return false;
        }
        SegmentMetadataClientFactory this$metaFactory = this.getMetaFactory();
        SegmentMetadataClientFactory other$metaFactory = other.getMetaFactory();
        if (this$metaFactory == null ? other$metaFactory != null : !this$metaFactory.equals(other$metaFactory)) {
            return false;
        }
        StateSynchronizer<ReaderGroupState> this$synchronizer = this.getSynchronizer();
        StateSynchronizer<ReaderGroupState> other$synchronizer = other.getSynchronizer();
        if (this$synchronizer == null ? other$synchronizer != null : !this$synchronizer.equals(other$synchronizer)) {
            return false;
        }
        NotifierFactory this$notifierFactory = this.getNotifierFactory();
        NotifierFactory other$notifierFactory = other.getNotifierFactory();
        return !(this$notifierFactory == null ? other$notifierFactory != null : !this$notifierFactory.equals(other$notifierFactory));
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    protected boolean canEqual(Object other) {
        return other instanceof ReaderGroupImpl;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public int hashCode() {
        int PRIME = 59;
        int result = 1;
        String $scope = this.getScope();
        result = result * 59 + ($scope == null ? 43 : $scope.hashCode());
        String $groupName = this.getGroupName();
        result = result * 59 + ($groupName == null ? 43 : $groupName.hashCode());
        Controller $controller = this.getController();
        result = result * 59 + ($controller == null ? 43 : $controller.hashCode());
        SegmentMetadataClientFactory $metaFactory = this.getMetaFactory();
        result = result * 59 + ($metaFactory == null ? 43 : $metaFactory.hashCode());
        StateSynchronizer<ReaderGroupState> $synchronizer = this.getSynchronizer();
        result = result * 59 + ($synchronizer == null ? 43 : $synchronizer.hashCode());
        NotifierFactory $notifierFactory = this.getNotifierFactory();
        result = result * 59 + ($notifierFactory == null ? 43 : $notifierFactory.hashCode());
        return result;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public String toString() {
        return "ReaderGroupImpl(scope=" + this.getScope() + ", groupName=" + this.getGroupName() + ", controller=" + this.getController() + ", metaFactory=" + this.getMetaFactory() + ", synchronizer=" + this.getSynchronizer() + ", notifierFactory=" + this.getNotifierFactory() + ")";
    }
}

