package com.uber.rss.execution;

import com.uber.rss.common.AppShuffleId;
import com.uber.rss.common.PartitionFilePathAndLength;
import com.uber.rss.exceptions.RssFileCorruptedException;
import com.uber.rss.exceptions.RssInvalidStateException;
import com.uber.rss.messages.AppDeletionStateItem;
import com.uber.rss.messages.BaseMessage;
import com.uber.rss.messages.StageCorruptionStateItem;
import com.uber.rss.messages.StageInfoStateItem;
import com.uber.rss.messages.TaskAttemptCommitStateItem;
import com.uber.rss.util.ByteBufUtils;
import com.uber.rss.util.FileUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/uber/rss/execution/LocalFileStateStore.class */
public class LocalFileStateStore implements StateStore {
    public static final String STATE_DIR_NAME = "state";
    public static final String STATE_FILE_PREFIX = "v1_";
    public static final long DEFAULT_ROTATION_MILLIS = 3600000;
    private final String stateDir;
    private final long fileRotationMillis;
    private final long fileRetentionMillis;
    private String currentFilePath;
    private FileOutputStream currentFileStream;
    private long currentFileCreateTime;
    private boolean closed;
    private static final Logger logger = LoggerFactory.getLogger(LocalFileStateStore.class);
    public static final long DEFAULT_RETENTION_MILLIS = ShuffleExecutor.DEFAULT_APP_FILE_RETENTION_MILLIS;
    private static final TimeZone utcTimeZone = TimeZone.getTimeZone("UTC");
    private static final DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHH");

    public LocalFileStateStore(String str) {
        this(str, DEFAULT_ROTATION_MILLIS, DEFAULT_RETENTION_MILLIS);
    }

    public LocalFileStateStore(String str, long j, long j2) {
        this.currentFileCreateTime = 0L;
        this.closed = false;
        this.stateDir = Paths.get(str, STATE_DIR_NAME).toString();
        this.fileRotationMillis = j;
        this.fileRetentionMillis = j2;
        Paths.get(str, STATE_DIR_NAME).toFile().mkdirs();
        createNewFileIfNecessary();
    }

    @Override // com.uber.rss.execution.StateStore
    public void storeStageInfo(AppShuffleId appShuffleId, StagePersistentInfo stagePersistentInfo) {
        writeState(new StageInfoStateItem(appShuffleId, stagePersistentInfo.getNumPartitions(), stagePersistentInfo.getFileStartIndex(), stagePersistentInfo.getShuffleWriteConfig(), stagePersistentInfo.getFileStatus()));
    }

    @Override // com.uber.rss.execution.StateStore
    public void storeTaskAttemptCommit(AppShuffleId appShuffleId, Collection<Long> collection, Collection<PartitionFilePathAndLength> collection2) {
        writeState(new TaskAttemptCommitStateItem(appShuffleId, collection, collection2));
    }

    @Override // com.uber.rss.execution.StateStore
    public void storeAppDeletion(String str) {
        writeState(new AppDeletionStateItem(str));
    }

    @Override // com.uber.rss.execution.StateStore
    public void storeStageCorruption(AppShuffleId appShuffleId) {
        writeState(new StageCorruptionStateItem(appShuffleId));
    }

    @Override // com.uber.rss.execution.StateStore
    public void commit() {
        synchronized (this) {
            try {
                this.currentFileStream.flush();
            } catch (IOException e) {
                throw new RssFileCorruptedException(String.format("Failed to flush state file %s", this.currentFilePath));
            }
        }
        if (System.currentTimeMillis() - this.currentFileCreateTime >= this.fileRotationMillis) {
            deleteOldFiles();
        }
        createNewFileIfNecessary();
    }

    @Override // com.uber.rss.execution.StateStore
    public LocalFileStateStoreIterator loadData() {
        List emptyList;
        try {
            Stream<Path> list = Files.list(Paths.get(this.stateDir, new String[0]));
            try {
                emptyList = (List) list.sorted(new Comparator<Path>() { // from class: com.uber.rss.execution.LocalFileStateStore.1
                    @Override // java.util.Comparator
                    public int compare(Path path, Path path2) {
                        return StringUtils.compare(path.toString(), path2.toString());
                    }
                }).map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.toList());
                if (list != null) {
                    list.close();
                }
            } finally {
            }
        } catch (IOException e) {
            logger.warn(String.format("Failed to load state from directory %s", this.stateDir), e);
            emptyList = Collections.emptyList();
        }
        logger.info(String.format("Creating iterator to load state: %s", StringUtils.join(emptyList, ',')));
        return new LocalFileStateStoreIterator(emptyList);
    }

    @Override // com.uber.rss.execution.StateStore, java.lang.AutoCloseable
    public void close() {
        try {
            synchronized (this) {
                this.closed = true;
                closeFileNoLock();
            }
        } catch (Throwable th) {
            logger.warn("Failed to close state file", th);
        }
    }

    public String toString() {
        String str;
        synchronized (this) {
            str = "StateStore{currentFilePath='" + this.currentFilePath + "'}";
        }
        return str;
    }

    private void createNewFileIfNecessary() {
        synchronized (this) {
            if (this.closed) {
                logger.info(String.format("State store already closed, do not create new file, %s", this));
                return;
            }
            if (System.currentTimeMillis() - this.currentFileCreateTime >= this.fileRotationMillis) {
                closeFileNoLock();
                String format = String.format("%s%s", STATE_FILE_PREFIX, dateFormat.format(new Date()));
                for (int i = 0; i < 10000; i++) {
                    Path path = Paths.get(this.stateDir, String.format("%s.%04d", format, Integer.valueOf(i)));
                    if (!Files.exists(path, new LinkOption[0])) {
                        try {
                            String path2 = path.toString();
                            this.currentFileStream = new FileOutputStream(path2, true);
                            this.currentFilePath = path2;
                            this.currentFileCreateTime = System.currentTimeMillis();
                            logger.info(String.format("Created state file: %s", path2));
                            return;
                        } catch (FileNotFoundException e) {
                            throw new RssFileCorruptedException(String.format("Failed to create state file: %s", path));
                        }
                    }
                }
                throw new RssInvalidStateException("Failed to create new state file");
            }
        }
    }

    private void deleteOldFiles() {
        try {
            FileUtils.cleanupOldFiles(this.stateDir, System.currentTimeMillis() - this.fileRetentionMillis);
        } catch (Throwable th) {
            logger.warn(String.format("Failed to clean up old state files in %s", this.stateDir), th);
        }
    }

    private void writeState(BaseMessage baseMessage) {
        ByteBuf buffer = Unpooled.buffer();
        try {
            baseMessage.serialize(buffer);
            byte[] readBytes = ByteBufUtils.readBytes(buffer);
            buffer.release();
            byte[] convertIntToBytes = ByteBufUtils.convertIntToBytes(baseMessage.getMessageType());
            byte[] convertIntToBytes2 = ByteBufUtils.convertIntToBytes(readBytes.length);
            synchronized (this) {
                try {
                    this.currentFileStream.write(convertIntToBytes);
                    this.currentFileStream.write(convertIntToBytes2);
                    this.currentFileStream.write(readBytes);
                } catch (IOException e) {
                    throw new RssFileCorruptedException(String.format("Failed to write %s to state file %s", baseMessage, this));
                }
            }
        } catch (Throwable th) {
            buffer.release();
            throw th;
        }
    }

    private void closeFileNoLock() {
        if (this.currentFileStream != null) {
            logger.info(String.format("Closing state file: %s", this.currentFilePath));
            try {
                this.currentFileStream.close();
                this.currentFileStream = null;
                this.currentFilePath = null;
                this.currentFileCreateTime = 0L;
            } catch (IOException e) {
                throw new RssFileCorruptedException(String.format("Failed to close old state file %s when trying to create new one", this.currentFilePath));
            }
        }
    }

    static {
        dateFormat.setTimeZone(utcTimeZone);
    }
}
