package com.google.cloud.pubsublite.spark;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Ticker;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.class */
public class LimitingHeadOffsetReader implements PerTopicHeadOffsetReader {
    private final TopicStatsClient topicStatsClient;
    private final TopicPath topic;
    private final long topicPartitionCount;
    private final AsyncLoadingCache<Partition, Offset> cachedHeadOffsets;

    @VisibleForTesting
    public LimitingHeadOffsetReader(TopicStatsClient topicStatsClient, TopicPath topicPath, long j, Ticker ticker) {
        this.topicStatsClient = topicStatsClient;
        this.topic = topicPath;
        this.topicPartitionCount = j;
        this.cachedHeadOffsets = Caffeine.newBuilder().ticker(ticker).expireAfterWrite(1L, TimeUnit.MINUTES).buildAsync(this::loadHeadOffset);
    }

    private CompletableFuture<Offset> loadHeadOffset(Partition partition, Executor executor) {
        final CompletableFuture<Offset> completableFuture = new CompletableFuture<>();
        ApiFutures.addCallback(this.topicStatsClient.computeHeadCursor(this.topic, partition), new ApiFutureCallback<Cursor>() { // from class: com.google.cloud.pubsublite.spark.LimitingHeadOffsetReader.1
            public void onFailure(Throwable th) {
                completableFuture.completeExceptionally(th);
            }

            public void onSuccess(Cursor cursor) {
                completableFuture.complete(Offset.of(cursor.getOffset()));
            }
        }, MoreExecutors.directExecutor());
        return completableFuture;
    }

    @Override // com.google.cloud.pubsublite.spark.PerTopicHeadOffsetReader
    public PslSourceOffset getHeadOffset() {
        HashSet hashSet = new HashSet();
        for (int i = 0; i < this.topicPartitionCount; i++) {
            hashSet.add(Partition.of(i));
        }
        try {
            return PslSourceOffset.builder().partitionOffsetMap((Map) this.cachedHeadOffsets.getAll(hashSet).get()).build();
        } catch (Throwable th) {
            throw new IllegalStateException("Unable to compute head offset for topic: " + this.topic, th);
        }
    }

    @Override // com.google.cloud.pubsublite.spark.PerTopicHeadOffsetReader, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.topicStatsClient.close();
    }
}
