package io.pravega.connectors.flink;

import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.stream.Checkpoint;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.shaded.com.google.common.base.Preconditions;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/connectors/flink/ReaderCheckpointHook.class */
class ReaderCheckpointHook implements MasterTriggerRestoreHook<Checkpoint> {
    private static final Logger log = LoggerFactory.getLogger(ReaderCheckpointHook.class);
    private static final String PRAVEGA_CHECKPOINT_NAME_PREFIX = "PVG-CHK-";
    private final String readerName;
    private final CheckpointSerializer checkpointSerializer = new CheckpointSerializer();
    private final ReaderGroup readerGroup;
    private final long triggerTimeout;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReaderCheckpointHook(String str, String str2, String str3, URI uri, long j) {
        this.readerName = (String) Preconditions.checkNotNull(str);
        this.triggerTimeout = j;
        this.readerGroup = ReaderGroupManager.withScope(str3, uri).getReaderGroup(str2);
    }

    public String getIdentifier() {
        return this.readerName;
    }

    public Future<Checkpoint> triggerCheckpoint(long j, long j2, Executor executor) throws Exception {
        String createCheckpointName = createCheckpointName(j);
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        CompletableFuture<Checkpoint> initiateCheckpoint = this.readerGroup.initiateCheckpoint(createCheckpointName, newSingleThreadScheduledExecutor);
        newSingleThreadScheduledExecutor.schedule(() -> {
            return Boolean.valueOf(initiateCheckpoint.cancel(false));
        }, this.triggerTimeout, TimeUnit.MILLISECONDS);
        initiateCheckpoint.handle((checkpoint, th) -> {
            return newSingleThreadScheduledExecutor.shutdownNow();
        });
        return flinkFutureFromJava8Future(initiateCheckpoint, executor);
    }

    public void restoreCheckpoint(long j, Checkpoint checkpoint) throws Exception {
        if (checkpoint != null) {
            this.readerGroup.resetReadersToCheckpoint(checkpoint);
        }
    }

    public SimpleVersionedSerializer<Checkpoint> createCheckpointDataSerializer() {
        return this.checkpointSerializer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static long parseCheckpointId(String str) {
        Preconditions.checkArgument(str.startsWith(PRAVEGA_CHECKPOINT_NAME_PREFIX));
        try {
            return Long.parseLong(str.substring(PRAVEGA_CHECKPOINT_NAME_PREFIX.length()));
        } catch (IndexOutOfBoundsException | NumberFormatException e) {
            throw new IllegalArgumentException(e);
        }
    }

    static String createCheckpointName(long j) {
        return PRAVEGA_CHECKPOINT_NAME_PREFIX + j;
    }

    private static <T> Future<T> flinkFutureFromJava8Future(CompletableFuture<T> completableFuture, Executor executor) {
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        completableFuture.handleAsync((BiFunction) (obj, th) -> {
            if (th != null) {
                flinkCompletableFuture.completeExceptionally(th);
                return null;
            }
            flinkCompletableFuture.complete(obj);
            return null;
        }, executor);
        return flinkCompletableFuture;
    }
}
