package io.pravega.client.stream.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientFactory;
import io.pravega.client.netty.impl.ConnectionFactory;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentMetadataClient;
import io.pravega.client.segment.impl.SegmentMetadataClientFactory;
import io.pravega.client.segment.impl.SegmentMetadataClientFactoryImpl;
import io.pravega.client.state.StateSynchronizer;
import io.pravega.client.state.SynchronizerConfig;
import io.pravega.client.stream.Checkpoint;
import io.pravega.client.stream.InvalidStreamException;
import io.pravega.client.stream.Position;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.ReaderGroupMetrics;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.impl.ReaderGroupState;
import io.pravega.common.concurrent.FutureHelpers;
import io.pravega.connectors.flink.util.FlinkPravegaParams;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shared.NameUtils;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/stream/impl/ReaderGroupImpl.class */
public class ReaderGroupImpl implements ReaderGroup, ReaderGroupMetrics {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(ReaderGroupImpl.class);
    private final String scope;
    private final String groupName;
    private final SynchronizerConfig synchronizerConfig;
    private final Serializer<ReaderGroupState.ReaderGroupStateInit> initSerializer;
    private final Serializer<ReaderGroupState.ReaderGroupStateUpdate> updateSerializer;
    private final ClientFactory clientFactory;
    private final Controller controller;
    private final ConnectionFactory connectionFactory;

    @VisibleForTesting
    public void initializeGroup(ReaderGroupConfig readerGroupConfig, Set<String> set) {
        StateSynchronizer<ReaderGroupState> createSynchronizer = createSynchronizer();
        try {
            ReaderGroupStateManager.initializeReaderGroup(createSynchronizer, readerGroupConfig, getSegmentsForStreams(set));
            if (Collections.singletonList(createSynchronizer).get(0) != null) {
                createSynchronizer.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(createSynchronizer).get(0) != null) {
                createSynchronizer.close();
            }
            throw th;
        }
    }

    private Map<Segment, Long> getSegmentsForStreams(Set<String> set) {
        ArrayList arrayList = new ArrayList(set.size());
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            arrayList.add(this.controller.getSegmentsAtTime(new StreamImpl(this.scope, it.next()), 0L));
        }
        return (Map) FutureHelpers.getAndHandleExceptions(FutureHelpers.allOfWithResults(arrayList).thenApply(list -> {
            return (Map) list.stream().flatMap(map -> {
                return map.entrySet().stream();
            }).collect(Collectors.toMap(entry -> {
                return (Segment) entry.getKey();
            }, entry2 -> {
                return (Long) entry2.getValue();
            }));
        }), InvalidStreamException::new);
    }

    @Override // io.pravega.client.stream.ReaderGroup
    public void readerOffline(String str, Position position) {
        StateSynchronizer<ReaderGroupState> createSynchronizer = createSynchronizer();
        try {
            ReaderGroupStateManager.readerShutdown(str, position, createSynchronizer);
            if (Collections.singletonList(createSynchronizer).get(0) != null) {
                createSynchronizer.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(createSynchronizer).get(0) != null) {
                createSynchronizer.close();
            }
            throw th;
        }
    }

    private StateSynchronizer<ReaderGroupState> createSynchronizer() {
        return this.clientFactory.createStateSynchronizer(NameUtils.getStreamForReaderGroup(this.groupName), this.updateSerializer, this.initSerializer, this.synchronizerConfig);
    }

    @Override // io.pravega.client.stream.ReaderGroup
    public Set<String> getOnlineReaders() {
        StateSynchronizer<ReaderGroupState> createSynchronizer = createSynchronizer();
        try {
            createSynchronizer.fetchUpdates();
            return createSynchronizer.getState().getOnlineReaders();
        } finally {
            if (Collections.singletonList(createSynchronizer).get(0) != null) {
                createSynchronizer.close();
            }
        }
    }

    @Override // io.pravega.client.stream.ReaderGroup
    public Set<String> getStreamNames() {
        StateSynchronizer<ReaderGroupState> createSynchronizer = createSynchronizer();
        try {
            createSynchronizer.fetchUpdates();
            return createSynchronizer.getState().getStreamNames();
        } finally {
            if (Collections.singletonList(createSynchronizer).get(0) != null) {
                createSynchronizer.close();
            }
        }
    }

    @Override // io.pravega.client.stream.ReaderGroup
    public CompletableFuture<Checkpoint> initiateCheckpoint(String str, ScheduledExecutorService scheduledExecutorService) {
        StateSynchronizer<ReaderGroupState> createSynchronizer = createSynchronizer();
        createSynchronizer.updateStateUnconditionally(new ReaderGroupState.CreateCheckpoint(str));
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        atomicBoolean.getClass();
        return FutureHelpers.loop(atomicBoolean::get, () -> {
            return FutureHelpers.delayedTask(() -> {
                createSynchronizer.fetchUpdates();
                atomicBoolean.set(!((ReaderGroupState) createSynchronizer.getState()).isCheckpointComplete(str));
                if (!atomicBoolean.get()) {
                    return null;
                }
                log.debug("Waiting on checkpoint: {} currentState is: {}", str, createSynchronizer.getState());
                return null;
            }, Duration.ofMillis(500L), scheduledExecutorService);
        }, scheduledExecutorService).thenApply(r7 -> {
            return completeCheckpoint(str, createSynchronizer);
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (checkpoint, th) -> {
            createSynchronizer.close();
        });
    }

    private Checkpoint completeCheckpoint(String str, StateSynchronizer<ReaderGroupState> stateSynchronizer) {
        try {
            Map<Segment, Long> positionsForCompletedCheckpoint = stateSynchronizer.getState().getPositionsForCompletedCheckpoint(str);
            stateSynchronizer.updateStateUnconditionally(new ReaderGroupState.ClearCheckpoints(str));
            if (positionsForCompletedCheckpoint == null) {
                throw new CheckpointFailedException("Checkpoint was cleared before results could be read.");
            }
            return new CheckpointImpl(str, positionsForCompletedCheckpoint);
        } catch (CheckpointFailedException e) {
            throw e;
        }
    }

    @Override // io.pravega.client.stream.ReaderGroup
    public void resetReadersToCheckpoint(Checkpoint checkpoint) {
        StateSynchronizer<ReaderGroupState> createSynchronizer = createSynchronizer();
        try {
            createSynchronizer.updateState(readerGroupState -> {
                ReaderGroupConfig config = readerGroupState.getConfig();
                HashMap hashMap = new HashMap();
                Iterator<StreamCut> it = checkpoint.asImpl().getPositions().values().iterator();
                while (it.hasNext()) {
                    hashMap.putAll(it.next().getPositions());
                }
                return Collections.singletonList(new ReaderGroupState.ReaderGroupStateInit(config, hashMap));
            });
        } finally {
            if (Collections.singletonList(createSynchronizer).get(0) != null) {
                createSynchronizer.close();
            }
        }
    }

    @Override // io.pravega.client.stream.ReaderGroup
    public void updateConfig(ReaderGroupConfig readerGroupConfig, Set<String> set) {
        StateSynchronizer<ReaderGroupState> createSynchronizer = createSynchronizer();
        try {
            createSynchronizer.updateStateUnconditionally(new ReaderGroupState.ReaderGroupStateInit(readerGroupConfig, getSegmentsForStreams(set)));
            if (Collections.singletonList(createSynchronizer).get(0) != null) {
                createSynchronizer.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(createSynchronizer).get(0) != null) {
                createSynchronizer.close();
            }
            throw th;
        }
    }

    @Override // io.pravega.client.stream.ReaderGroup
    public ReaderGroupMetrics getMetrics() {
        return this;
    }

    @Override // io.pravega.client.stream.ReaderGroupMetrics
    public long unreadBytes() {
        StateSynchronizer<ReaderGroupState> createSynchronizer = createSynchronizer();
        try {
            Map<Stream, Map<Segment, Long>> positions = createSynchronizer.getState().getPositions();
            SegmentMetadataClientFactoryImpl segmentMetadataClientFactoryImpl = new SegmentMetadataClientFactoryImpl(this.controller, this.connectionFactory);
            long j = 0;
            for (Map.Entry<Stream, Map<Segment, Long>> entry : positions.entrySet()) {
                j += getRemainingBytes(segmentMetadataClientFactoryImpl, new StreamCut(entry.getKey(), entry.getValue()));
            }
            return j;
        } finally {
            if (Collections.singletonList(createSynchronizer).get(0) != null) {
                createSynchronizer.close();
            }
        }
    }

    private long getRemainingBytes(SegmentMetadataClientFactory segmentMetadataClientFactory, StreamCut streamCut) {
        long j = 0;
        Iterator it = ((Set) FutureHelpers.getAndHandleExceptions(this.controller.getSuccessors(streamCut), RuntimeException::new)).iterator();
        while (it.hasNext()) {
            SegmentMetadataClient createSegmentMetadataClient = segmentMetadataClientFactory.createSegmentMetadataClient((Segment) it.next());
            try {
                j += createSegmentMetadataClient.fetchCurrentStreamLength();
                if (Collections.singletonList(createSegmentMetadataClient).get(0) != null) {
                    createSegmentMetadataClient.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(createSegmentMetadataClient).get(0) != null) {
                    createSegmentMetadataClient.close();
                }
                throw th;
            }
        }
        Iterator<Long> it2 = streamCut.getPositions().values().iterator();
        while (it2.hasNext()) {
            j -= it2.next().longValue();
        }
        return j;
    }

    @SuppressFBWarnings(justification = "generated code")
    @ConstructorProperties({"scope", "groupName", "synchronizerConfig", "initSerializer", "updateSerializer", "clientFactory", FlinkPravegaParams.CONTROLLER_PARAM_NAME, "connectionFactory"})
    public ReaderGroupImpl(String str, String str2, SynchronizerConfig synchronizerConfig, Serializer<ReaderGroupState.ReaderGroupStateInit> serializer, Serializer<ReaderGroupState.ReaderGroupStateUpdate> serializer2, ClientFactory clientFactory, Controller controller, ConnectionFactory connectionFactory) {
        this.scope = str;
        this.groupName = str2;
        this.synchronizerConfig = synchronizerConfig;
        this.initSerializer = serializer;
        this.updateSerializer = serializer2;
        this.clientFactory = clientFactory;
        this.controller = controller;
        this.connectionFactory = connectionFactory;
    }

    @Override // io.pravega.client.stream.ReaderGroup
    @SuppressFBWarnings(justification = "generated code")
    public String getScope() {
        return this.scope;
    }

    @Override // io.pravega.client.stream.ReaderGroup
    @SuppressFBWarnings(justification = "generated code")
    public String getGroupName() {
        return this.groupName;
    }

    @SuppressFBWarnings(justification = "generated code")
    public SynchronizerConfig getSynchronizerConfig() {
        return this.synchronizerConfig;
    }

    @SuppressFBWarnings(justification = "generated code")
    public Serializer<ReaderGroupState.ReaderGroupStateInit> getInitSerializer() {
        return this.initSerializer;
    }

    @SuppressFBWarnings(justification = "generated code")
    public Serializer<ReaderGroupState.ReaderGroupStateUpdate> getUpdateSerializer() {
        return this.updateSerializer;
    }

    @SuppressFBWarnings(justification = "generated code")
    public ClientFactory getClientFactory() {
        return this.clientFactory;
    }

    @SuppressFBWarnings(justification = "generated code")
    public Controller getController() {
        return this.controller;
    }

    @SuppressFBWarnings(justification = "generated code")
    public ConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    @SuppressFBWarnings(justification = "generated code")
    public boolean equals(Object obj) {
        if (obj == this) {
            return true;
        }
        if (!(obj instanceof ReaderGroupImpl)) {
            return false;
        }
        ReaderGroupImpl readerGroupImpl = (ReaderGroupImpl) obj;
        if (!readerGroupImpl.canEqual(this)) {
            return false;
        }
        String scope = getScope();
        String scope2 = readerGroupImpl.getScope();
        if (scope == null) {
            if (scope2 != null) {
                return false;
            }
        } else if (!scope.equals(scope2)) {
            return false;
        }
        String groupName = getGroupName();
        String groupName2 = readerGroupImpl.getGroupName();
        if (groupName == null) {
            if (groupName2 != null) {
                return false;
            }
        } else if (!groupName.equals(groupName2)) {
            return false;
        }
        SynchronizerConfig synchronizerConfig = getSynchronizerConfig();
        SynchronizerConfig synchronizerConfig2 = readerGroupImpl.getSynchronizerConfig();
        if (synchronizerConfig == null) {
            if (synchronizerConfig2 != null) {
                return false;
            }
        } else if (!synchronizerConfig.equals(synchronizerConfig2)) {
            return false;
        }
        Serializer<ReaderGroupState.ReaderGroupStateInit> initSerializer = getInitSerializer();
        Serializer<ReaderGroupState.ReaderGroupStateInit> initSerializer2 = readerGroupImpl.getInitSerializer();
        if (initSerializer == null) {
            if (initSerializer2 != null) {
                return false;
            }
        } else if (!initSerializer.equals(initSerializer2)) {
            return false;
        }
        Serializer<ReaderGroupState.ReaderGroupStateUpdate> updateSerializer = getUpdateSerializer();
        Serializer<ReaderGroupState.ReaderGroupStateUpdate> updateSerializer2 = readerGroupImpl.getUpdateSerializer();
        if (updateSerializer == null) {
            if (updateSerializer2 != null) {
                return false;
            }
        } else if (!updateSerializer.equals(updateSerializer2)) {
            return false;
        }
        ClientFactory clientFactory = getClientFactory();
        ClientFactory clientFactory2 = readerGroupImpl.getClientFactory();
        if (clientFactory == null) {
            if (clientFactory2 != null) {
                return false;
            }
        } else if (!clientFactory.equals(clientFactory2)) {
            return false;
        }
        Controller controller = getController();
        Controller controller2 = readerGroupImpl.getController();
        if (controller == null) {
            if (controller2 != null) {
                return false;
            }
        } else if (!controller.equals(controller2)) {
            return false;
        }
        ConnectionFactory connectionFactory = getConnectionFactory();
        ConnectionFactory connectionFactory2 = readerGroupImpl.getConnectionFactory();
        return connectionFactory == null ? connectionFactory2 == null : connectionFactory.equals(connectionFactory2);
    }

    @SuppressFBWarnings(justification = "generated code")
    protected boolean canEqual(Object obj) {
        return obj instanceof ReaderGroupImpl;
    }

    @SuppressFBWarnings(justification = "generated code")
    public int hashCode() {
        String scope = getScope();
        int hashCode = (1 * 59) + (scope == null ? 43 : scope.hashCode());
        String groupName = getGroupName();
        int hashCode2 = (hashCode * 59) + (groupName == null ? 43 : groupName.hashCode());
        SynchronizerConfig synchronizerConfig = getSynchronizerConfig();
        int hashCode3 = (hashCode2 * 59) + (synchronizerConfig == null ? 43 : synchronizerConfig.hashCode());
        Serializer<ReaderGroupState.ReaderGroupStateInit> initSerializer = getInitSerializer();
        int hashCode4 = (hashCode3 * 59) + (initSerializer == null ? 43 : initSerializer.hashCode());
        Serializer<ReaderGroupState.ReaderGroupStateUpdate> updateSerializer = getUpdateSerializer();
        int hashCode5 = (hashCode4 * 59) + (updateSerializer == null ? 43 : updateSerializer.hashCode());
        ClientFactory clientFactory = getClientFactory();
        int hashCode6 = (hashCode5 * 59) + (clientFactory == null ? 43 : clientFactory.hashCode());
        Controller controller = getController();
        int hashCode7 = (hashCode6 * 59) + (controller == null ? 43 : controller.hashCode());
        ConnectionFactory connectionFactory = getConnectionFactory();
        return (hashCode7 * 59) + (connectionFactory == null ? 43 : connectionFactory.hashCode());
    }

    @SuppressFBWarnings(justification = "generated code")
    public String toString() {
        return "ReaderGroupImpl(scope=" + getScope() + ", groupName=" + getGroupName() + ", synchronizerConfig=" + getSynchronizerConfig() + ", initSerializer=" + getInitSerializer() + ", updateSerializer=" + getUpdateSerializer() + ", clientFactory=" + getClientFactory() + ", controller=" + getController() + ", connectionFactory=" + getConnectionFactory() + ")";
    }
}
