package com.google.cloud.pubsublite.spark.internal;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.spark.PslSourceOffset;
import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;

/* loaded from: input_file:com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitterImpl.class */
public class MultiPartitionCommitterImpl implements MultiPartitionCommitter {
    private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
    private final MultiPartitionCommitter.CommitterFactory committerFactory;

    @GuardedBy("this")
    private final Map<Partition, Committer> committerMap;

    @GuardedBy("this")
    private final Set<Partition> partitionsCleanUp;

    public MultiPartitionCommitterImpl(long j, MultiPartitionCommitter.CommitterFactory committerFactory) {
        this(j, committerFactory, MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1)));
    }

    @VisibleForTesting
    MultiPartitionCommitterImpl(long j, MultiPartitionCommitter.CommitterFactory committerFactory, ScheduledExecutorService scheduledExecutorService) {
        this.committerMap = new HashMap();
        this.partitionsCleanUp = new HashSet();
        this.committerFactory = committerFactory;
        for (int i = 0; i < j; i++) {
            Partition of = Partition.of(i);
            this.committerMap.put(of, createCommitter(of));
        }
        scheduledExecutorService.scheduleWithFixedDelay(this::cleanUpCommitterMap, 10L, 10L, TimeUnit.MINUTES);
    }

    @Override // com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter, java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        this.committerMap.values().forEach(committer -> {
            committer.stopAsync().awaitTerminated();
        });
    }

    private synchronized void updateCommitterMap(PslSourceOffset pslSourceOffset) {
        int size = this.committerMap.size();
        int size2 = pslSourceOffset.partitionOffsetMap().size();
        if (size == size2) {
            return;
        }
        if (size >= size2) {
            this.partitionsCleanUp.clear();
            for (int i = size2; i < size; i++) {
                this.partitionsCleanUp.add(Partition.of(i));
            }
            return;
        }
        for (int i2 = size; i2 < size2; i2++) {
            Partition of = Partition.of(i2);
            if (!this.committerMap.containsKey(of)) {
                this.committerMap.put(of, createCommitter(of));
            }
            this.partitionsCleanUp.remove(of);
        }
    }

    private synchronized Committer createCommitter(Partition partition) {
        Committer newCommitter = this.committerFactory.newCommitter(partition);
        newCommitter.startAsync().awaitRunning();
        return newCommitter;
    }

    private synchronized void cleanUpCommitterMap() {
        for (Partition partition : this.partitionsCleanUp) {
            this.committerMap.get(partition).stopAsync();
            this.committerMap.remove(partition);
        }
        this.partitionsCleanUp.clear();
    }

    @Override // com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter
    public synchronized void commit(PslSourceOffset pslSourceOffset) {
        updateCommitterMap(pslSourceOffset);
        pslSourceOffset.partitionOffsetMap().forEach((partition, offset) -> {
            final ApiFuture commitOffset = this.committerMap.get(partition).commitOffset(offset);
            ApiFutures.addCallback(commitOffset, new ApiFutureCallback<Void>() { // from class: com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitterImpl.1
                public void onFailure(Throwable th) {
                    if (commitOffset.isCancelled()) {
                        return;
                    }
                    MultiPartitionCommitterImpl.log.atWarning().log("Failed to commit %s,%s.", Long.valueOf(partition.value()), Long.valueOf(offset.value()), th);
                }

                public void onSuccess(Void r8) {
                    MultiPartitionCommitterImpl.log.atInfo().log("Committed %s,%s.", partition.value(), offset.value());
                }
            }, MoreExecutors.directExecutor());
        });
    }
}
