package io.pravega.client.batch.impl;

import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.BatchClientFactory;
import io.pravega.client.ClientConfig;
import io.pravega.client.admin.impl.StreamCutHelper;
import io.pravega.client.batch.SegmentIterator;
import io.pravega.client.batch.SegmentRange;
import io.pravega.client.batch.StreamSegmentsIterator;
import io.pravega.client.batch.impl.SegmentRangeImpl;
import io.pravega.client.connection.impl.ConnectionFactory;
import io.pravega.client.connection.impl.ConnectionPool;
import io.pravega.client.connection.impl.ConnectionPoolImpl;
import io.pravega.client.control.impl.Controller;
import io.pravega.client.security.auth.DelegationTokenProvider;
import io.pravega.client.security.auth.DelegationTokenProviderFactory;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentInfo;
import io.pravega.client.segment.impl.SegmentInputStreamFactory;
import io.pravega.client.segment.impl.SegmentInputStreamFactoryImpl;
import io.pravega.client.segment.impl.SegmentMetadataClient;
import io.pravega.client.segment.impl.SegmentMetadataClientFactory;
import io.pravega.client.segment.impl.SegmentMetadataClientFactoryImpl;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.impl.StreamSegmentSuccessors;
import io.pravega.common.concurrent.Futures;
import io.pravega.shared.security.auth.AccessOperation;
import java.util.List;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Beta
/* loaded from: input_file:io/pravega/client/batch/impl/BatchClientFactoryImpl.class */
public class BatchClientFactoryImpl implements BatchClientFactory {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(BatchClientFactoryImpl.class);
    private final Controller controller;
    private final ConnectionPool connectionPool;
    private final SegmentInputStreamFactory inputStreamFactory;
    private final SegmentMetadataClientFactory segmentMetadataClientFactory;
    private final StreamCutHelper streamCutHelper;

    public BatchClientFactoryImpl(Controller controller, ClientConfig clientConfig, ConnectionFactory connectionFactory) {
        this.controller = controller;
        this.connectionPool = new ConnectionPoolImpl(clientConfig, connectionFactory);
        this.inputStreamFactory = new SegmentInputStreamFactoryImpl(controller, this.connectionPool);
        this.segmentMetadataClientFactory = new SegmentMetadataClientFactoryImpl(controller, this.connectionPool);
        this.streamCutHelper = new StreamCutHelper(controller, this.connectionPool);
    }

    @Override // io.pravega.client.BatchClientFactory
    public StreamSegmentsIterator getSegments(Stream stream, StreamCut streamCut, StreamCut streamCut2) {
        Preconditions.checkNotNull(stream, "stream");
        return listSegments(stream, Optional.ofNullable(streamCut), Optional.ofNullable(streamCut2));
    }

    @Override // io.pravega.client.BatchClientFactory
    public <T> SegmentIterator<T> readSegment(SegmentRange segmentRange, Serializer<T> serializer) {
        return new SegmentIteratorImpl(this.inputStreamFactory, segmentRange.asImpl().getSegment(), serializer, segmentRange.asImpl().getStartOffset(), segmentRange.asImpl().getEndOffset());
    }

    private StreamSegmentsIterator listSegments(Stream stream, Optional<StreamCut> optional, Optional<StreamCut> optional2) {
        Optional<StreamCut> filter = optional.filter(streamCut -> {
            return !streamCut.equals(StreamCut.UNBOUNDED);
        });
        Optional<StreamCut> filter2 = optional2.filter(streamCut2 -> {
            return !streamCut2.equals(StreamCut.UNBOUNDED);
        });
        filter.ifPresent(streamCut3 -> {
            Preconditions.checkArgument(stream.equals(streamCut3.asImpl().getStream()));
        });
        filter2.ifPresent(streamCut4 -> {
            Preconditions.checkArgument(stream.equals(streamCut4.asImpl().getStream()));
        });
        return getStreamSegmentInfo(stream, (filter.isPresent() ? CompletableFuture.completedFuture(filter.get()) : this.streamCutHelper.fetchHeadStreamCut(stream)).join(), (filter2.isPresent() ? CompletableFuture.completedFuture(filter2.get()) : this.streamCutHelper.fetchTailStreamCut(stream)).join());
    }

    private StreamSegmentsIterator getStreamSegmentInfo(Stream stream, StreamCut streamCut, StreamCut streamCut2) {
        log.debug("Start stream cut: {}, End stream cut: {}", streamCut, streamCut2);
        StreamSegmentsInfoImpl.validateStreamCuts(streamCut, streamCut2);
        TreeSet treeSet = new TreeSet(((StreamSegmentSuccessors) Futures.getAndHandleExceptions(this.controller.getSegments(streamCut, streamCut2), RuntimeException::new)).getSegments());
        DelegationTokenProvider create = DelegationTokenProviderFactory.create(this.controller, stream.getScope(), stream.getStreamName(), AccessOperation.READ);
        log.debug("List of Segments between the start and end stream cuts : {}", treeSet);
        return StreamSegmentsInfoImpl.builder().segmentRangeIterator(((List) Futures.getThrowingException(Futures.allOfWithResults((List) treeSet.stream().map(segment -> {
            return getSegmentRange(segment, streamCut, streamCut2, create);
        }).collect(Collectors.toList())))).iterator()).startStreamCut(streamCut).endStreamCut(streamCut2).build();
    }

    private CompletableFuture<SegmentRange> getSegmentRange(Segment segment, StreamCut streamCut, StreamCut streamCut2, DelegationTokenProvider delegationTokenProvider) {
        if (!streamCut.asImpl().getPositions().containsKey(segment) || !streamCut2.asImpl().getPositions().containsKey(segment)) {
            return segmentToInfo(segment, delegationTokenProvider).thenApply(segmentInfo -> {
                SegmentRangeImpl.SegmentRangeImplBuilder segment2 = SegmentRangeImpl.builder().segment(segment);
                segment2.startOffset(streamCut.asImpl().getPositions().getOrDefault(segment, Long.valueOf(segmentInfo.getStartingOffset())).longValue()).endOffset(streamCut2.asImpl().getPositions().getOrDefault(segment, Long.valueOf(segmentInfo.getWriteOffset())).longValue());
                return segment2.build();
            });
        }
        SegmentRangeImpl.SegmentRangeImplBuilder segment2 = SegmentRangeImpl.builder().segment(segment);
        segment2.startOffset(streamCut.asImpl().getPositions().get(segment).longValue()).endOffset(streamCut2.asImpl().getPositions().get(segment).longValue());
        return CompletableFuture.completedFuture(segment2.build());
    }

    private CompletableFuture<SegmentInfo> segmentToInfo(Segment segment, DelegationTokenProvider delegationTokenProvider) {
        SegmentMetadataClient createSegmentMetadataClient = this.segmentMetadataClientFactory.createSegmentMetadataClient(segment, delegationTokenProvider);
        CompletableFuture<SegmentInfo> segmentInfo = createSegmentMetadataClient.getSegmentInfo();
        segmentInfo.whenComplete((segmentInfo2, th) -> {
            createSegmentMetadataClient.close();
        });
        return segmentInfo;
    }

    @Override // io.pravega.client.BatchClientFactory, java.lang.AutoCloseable
    public void close() {
        this.controller.close();
        this.connectionPool.close();
    }
}
