/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Paths;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.Sets;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.io.MoreFiles;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.io.RecursiveDeleteOption;
import org.apache.pulsar.functions.runtime.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.statelib.api.exceptions.StateStoreException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.statelib.impl.rocksdb.RocksUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.CheckpointFile;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.kv.store.CheckpointMetadata;
import org.apache.pulsar.functions.runtime.shaded.org.rocksdb.Checkpoint;
import org.apache.pulsar.functions.runtime.shaded.org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksdbCheckpointTask {
    private static final Logger log = LoggerFactory.getLogger(RocksdbCheckpointTask.class);
    private final String dbName;
    private final Checkpoint checkpoint;
    private final File checkpointDir;
    private final CheckpointStore checkpointStore;
    private final String dbPrefix;
    private final boolean removeLocalCheckpointAfterSuccessfulCheckpoint;
    private final boolean removeRemoteCheckpointsAfterSuccessfulCheckpoint;
    private final boolean checkpointChecksumCompatible;
    private final boolean checkpointChecksumEnable;
    private InjectedError<String> injectedError = checkpointId -> {};

    public RocksdbCheckpointTask(String dbName, Checkpoint checkpoint, File checkpointDir, CheckpointStore checkpointStore, boolean removeLocalCheckpoint, boolean removeRemoteCheckpoints, boolean checkpointChecksumEnable, boolean checkpointChecksumCompatible) {
        this.dbName = dbName;
        this.checkpoint = checkpoint;
        this.checkpointDir = checkpointDir;
        this.checkpointStore = checkpointStore;
        this.dbPrefix = String.format("%s", dbName);
        this.removeLocalCheckpointAfterSuccessfulCheckpoint = removeLocalCheckpoint;
        this.removeRemoteCheckpointsAfterSuccessfulCheckpoint = removeRemoteCheckpoints;
        this.checkpointChecksumEnable = checkpointChecksumEnable;
        this.checkpointChecksumCompatible = checkpointChecksumCompatible;
    }

    public void setInjectedError(InjectedError<String> injectedError) {
        this.injectedError = injectedError;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public String checkpoint(byte[] txid) throws StateStoreException {
        String string;
        String checkpointId = UUID.randomUUID().toString();
        File tempDir = new File(this.checkpointDir, checkpointId);
        log.info("Create a local checkpoint of state store {} at {}", (Object)this.dbName, (Object)tempDir);
        try {
            String sstsPath;
            try {
                this.checkpoint.createCheckpoint(tempDir.getAbsolutePath());
            }
            catch (RocksDBException e) {
                throw new StateStoreException("Failed to create a checkpoint at " + tempDir, e);
            }
            String remoteCheckpointPath = RocksUtils.getDestCheckpointPath(this.dbPrefix, checkpointId);
            if (!this.checkpointStore.fileExists(remoteCheckpointPath)) {
                this.checkpointStore.createDirectories(remoteCheckpointPath);
            }
            if (!this.checkpointStore.fileExists(sstsPath = RocksUtils.getDestSstsPath(this.dbPrefix))) {
                this.checkpointStore.createDirectories(sstsPath);
            }
            this.injectedError.accept(checkpointId);
            List<CheckpointFile> checkpointFiles = CheckpointFile.list(tempDir);
            List<CheckpointFile> filesToCopy = checkpointFiles.stream().filter(f -> f.needCopy(this.checkpointStore, this.dbPrefix, this.checkpointChecksumEnable)).collect(Collectors.toList());
            this.copyFilesToDest(checkpointId, filesToCopy);
            this.finalizeCopyFiles(checkpointId, filesToCopy);
            this.finalizeCheckpoint(checkpointFiles, checkpointId, txid);
            if (this.removeRemoteCheckpointsAfterSuccessfulCheckpoint) {
                this.cleanupRemoteCheckpoints(tempDir, checkpointId, checkpointFiles);
            }
            string = checkpointId;
            if (!this.removeLocalCheckpointAfterSuccessfulCheckpoint) return string;
            if (!tempDir.exists()) return string;
        }
        catch (IOException ioe) {
            try {
                log.error("Failed to checkpoint db {} to dir {}", new Object[]{this.dbName, tempDir, ioe});
                throw new StateStoreException("Failed to checkpoint db " + this.dbName + " to dir " + tempDir, ioe);
            }
            catch (Throwable throwable) {
                if (!this.removeLocalCheckpointAfterSuccessfulCheckpoint) throw throwable;
                if (!tempDir.exists()) throw throwable;
                try {
                    MoreFiles.deleteRecursively(Paths.get(tempDir.getAbsolutePath(), new String[0]), RecursiveDeleteOption.ALLOW_INSECURE);
                    throw throwable;
                }
                catch (IOException ioe2) {
                    log.warn("Failed to remove temporary checkpoint dir {}", (Object)tempDir, (Object)ioe2);
                }
                throw throwable;
            }
        }
        try {
            MoreFiles.deleteRecursively(Paths.get(tempDir.getAbsolutePath(), new String[0]), RecursiveDeleteOption.ALLOW_INSECURE);
            return string;
        }
        catch (IOException ioe) {
            log.warn("Failed to remove temporary checkpoint dir {}", (Object)tempDir, (Object)ioe);
        }
        return string;
    }

    private void copyFilesToDest(String checkpointId, List<CheckpointFile> files) throws IOException {
        for (CheckpointFile file : files) {
            file.copyToRemote(this.checkpointStore, this.dbPrefix, checkpointId);
        }
    }

    private void finalizeCopyFiles(String checkpointId, List<CheckpointFile> files) throws IOException {
        for (CheckpointFile file : files) {
            file.finalize(this.checkpointStore, this.dbPrefix, checkpointId, this.checkpointChecksumEnable, this.checkpointChecksumCompatible);
        }
    }

    private void finalizeCheckpoint(List<CheckpointFile> files, String checkpointId, byte[] txid) throws IOException {
        CheckpointMetadata.Builder metadataBuilder = CheckpointMetadata.newBuilder();
        for (CheckpointFile file : files) {
            if (this.checkpointChecksumEnable) {
                metadataBuilder.addFileInfos(file.getFileInfo());
            }
            metadataBuilder.addFiles(file.getName());
        }
        if (null != txid) {
            metadataBuilder.setTxid(UnsafeByteOperations.unsafeWrap(txid));
        }
        metadataBuilder.setCreatedAt(System.currentTimeMillis());
        String destCheckpointPath = RocksUtils.getDestCheckpointMetadataPath(this.dbPrefix, checkpointId);
        try (OutputStream os = this.checkpointStore.openOutputStream(destCheckpointPath);){
            os.write(metadataBuilder.build().toByteArray());
        }
    }

    private void cleanupRemoteCheckpoints(File checkpointedDir, String checkpointToExclude, List<CheckpointFile> filesToKeep) throws IOException {
        String checkpointsPath = RocksUtils.getDestCheckpointsPath(this.dbPrefix);
        List<String> checkpoints = this.checkpointStore.listFiles(checkpointsPath);
        for (String checkpoint : checkpoints) {
            if (checkpoint.equals(checkpointToExclude)) continue;
            String remoteCheckpointPath = RocksUtils.getDestCheckpointPath(this.dbPrefix, checkpoint);
            this.checkpointStore.deleteRecursively(remoteCheckpointPath);
            log.info("Delete remote checkpoint {} from checkpoint store at {}", (Object)checkpoint, (Object)remoteCheckpointPath);
        }
        Set sstsToKeep = filesToKeep.stream().filter(f -> f.isSstFile()).map(f -> f.getNameWithChecksum()).collect(Collectors.toSet());
        if (this.checkpointChecksumCompatible) {
            Set files = filesToKeep.stream().filter(f -> f.isSstFile()).map(f -> f.getName()).collect(Collectors.toSet());
            sstsToKeep.addAll(files);
        }
        Set allSsts = this.checkpointStore.listFiles(RocksUtils.getDestSstsPath(this.dbPrefix)).stream().collect(Collectors.toSet());
        Sets.SetView<String> toDelete = Sets.difference(allSsts, sstsToKeep);
        for (String sst : toDelete) {
            this.checkpointStore.delete(RocksUtils.getDestSstPath(this.dbPrefix, sst));
        }
    }

    @FunctionalInterface
    public static interface InjectedError<T> {
        public void accept(T var1) throws IOException;
    }
}

