package io.trino.plugin.deltalake.transactionlog.writer;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.spi.connector.ConnectorSession;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.util.Base64;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.inject.Inject;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.parquet.Preconditions;

/* loaded from: input_file:io/trino/plugin/deltalake/transactionlog/writer/S3TransactionLogSynchronizer.class */
public class S3TransactionLogSynchronizer implements TransactionLogSynchronizer {
    private static final String LOCK_DIRECTORY = "_sb_lock";
    private static final String LOCK_INFIX = "sb-lock_";
    private final HdfsEnvironment hdfsEnvironment;
    private final JsonCodec<LockFileContents> lockFileContentsJsonCodec;
    public static final Logger LOG = Logger.get(S3TransactionLogSynchronizer.class);
    private static final Duration EXPIRATION_DURATION = Duration.of(5, ChronoUnit.MINUTES);
    private static final Pattern LOCK_FILENAME_PATTERN = Pattern.compile("(.*)\\.sb-lock_.*");

    /* loaded from: input_file:io/trino/plugin/deltalake/transactionlog/writer/S3TransactionLogSynchronizer$LockFileContents.class */
    public static class LockFileContents {
        private final String clusterId;
        private final String owningQuery;
        private final long expirationEpochMillis;

        @JsonCreator
        public LockFileContents(@JsonProperty("clusterId") String str, @JsonProperty("owningQuery") String str2, @JsonProperty("expirationEpochMillis") long j) {
            this.clusterId = (String) Objects.requireNonNull(str, "clusterId is null");
            this.owningQuery = (String) Objects.requireNonNull(str2, "owningQuery is null");
            this.expirationEpochMillis = j;
        }

        @JsonProperty
        public String getClusterId() {
            return this.clusterId;
        }

        @JsonProperty
        public String getOwningQuery() {
            return this.owningQuery;
        }

        @JsonProperty
        public long getExpirationEpochMillis() {
            return this.expirationEpochMillis;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/plugin/deltalake/transactionlog/writer/S3TransactionLogSynchronizer$LockInfo.class */
    public static class LockInfo {
        private final String lockFilename;
        private final String entryFilename;
        private final LockFileContents contents;

        public LockInfo(String str, LockFileContents lockFileContents) {
            this.lockFilename = (String) Objects.requireNonNull(str, "lockFilename is null");
            this.entryFilename = S3TransactionLogSynchronizer.parseEntryFilename(str);
            this.contents = (LockFileContents) Objects.requireNonNull(lockFileContents, "contents is null");
        }

        public String getLockFilename() {
            return this.lockFilename;
        }

        public String getEntryFilename() {
            return this.entryFilename;
        }

        public String getClusterId() {
            return this.contents.getClusterId();
        }

        public String getOwningQuery() {
            return this.contents.getOwningQuery();
        }

        public Instant getExpirationTime() {
            return Instant.ofEpochMilli(this.contents.getExpirationEpochMillis());
        }
    }

    @Inject
    public S3TransactionLogSynchronizer(HdfsEnvironment hdfsEnvironment, JsonCodec<LockFileContents> jsonCodec) {
        this.hdfsEnvironment = (HdfsEnvironment) Objects.requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
        this.lockFileContentsJsonCodec = (JsonCodec) Objects.requireNonNull(jsonCodec, "lockFileContentesCodec is null");
    }

    @Override // io.trino.plugin.deltalake.transactionlog.writer.TransactionLogSynchronizer
    public boolean isUnsafe() {
        return true;
    }

    @Override // io.trino.plugin.deltalake.transactionlog.writer.TransactionLogSynchronizer
    public void write(ConnectorSession connectorSession, String str, Path path, byte[] bArr) {
        FileSystem fileSystem = getFileSystem(connectorSession, path);
        Path path2 = new Path(path.getParent(), LOCK_DIRECTORY);
        String name = path.getName();
        Optional empty = Optional.empty();
        try {
            try {
                if (fileSystem.exists(path)) {
                    throw new TransactionConflictException(path + " already exists");
                }
                List<LockInfo> listLockInfos = listLockInfos(fileSystem, path2);
                Optional empty2 = Optional.empty();
                for (LockInfo lockInfo : listLockInfos) {
                    if (lockInfo.getExpirationTime().isBefore(Instant.now())) {
                        deleteLock(fileSystem, path2, lockInfo);
                    } else if (!lockInfo.getEntryFilename().equals(name)) {
                        continue;
                    } else {
                        if (empty2.isPresent()) {
                            throw new IllegalStateException(String.format("Multiple live locks found for: %s; lock1: %s; lock2: %s", path, ((LockInfo) empty2.get()).getLockFilename(), lockInfo.getLockFilename()));
                        }
                        empty2 = Optional.of(lockInfo);
                    }
                }
                empty2.ifPresent(lockInfo2 -> {
                    throw new TransactionConflictException(String.format("Transaction log locked(1); lockingCluster=%s; lockingQuery=%s; expires=%s", lockInfo2.getClusterId(), lockInfo2.getOwningQuery(), lockInfo2.getExpirationTime()));
                });
                Optional of = Optional.of(writeNewLockInfo(fileSystem, path2, name, str, connectorSession.getQueryId()));
                List<LockInfo> listLockInfos2 = listLockInfos(fileSystem, path2);
                String lockFilename = ((LockInfo) of.get()).getLockFilename();
                Optional<LockInfo> findFirst = listLockInfos2.stream().filter(lockInfo3 -> {
                    return lockInfo3.getEntryFilename().equals(name);
                }).filter(lockInfo4 -> {
                    return !lockInfo4.getLockFilename().equals(lockFilename);
                }).findFirst();
                if (findFirst.isPresent()) {
                    throw new TransactionConflictException(String.format("Transaction log locked(2); lockingCluster=%s; lockingQuery=%s; expires=%s", findFirst.get().getClusterId(), findFirst.get().getOwningQuery(), findFirst.get().getExpirationTime()));
                }
                Preconditions.checkState(!fileSystem.exists(path), String.format("Target file %s was created during locking", path));
                FSDataOutputStream create = fileSystem.create(path, false);
                try {
                    create.write(bArr);
                    if (create != null) {
                        create.close();
                    }
                    if (of.isPresent()) {
                        try {
                            deleteLock(fileSystem, path2, (LockInfo) of.get());
                        } catch (IOException e) {
                            LOG.warn(e, "Could not delete lockfile %s", new Object[]{((LockInfo) of.get()).lockFilename});
                        }
                    }
                } catch (Throwable th) {
                    if (create != null) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (empty.isPresent()) {
                    try {
                        deleteLock(fileSystem, path2, (LockInfo) empty.get());
                    } catch (IOException e2) {
                        LOG.warn(e2, "Could not delete lockfile %s", new Object[]{((LockInfo) empty.get()).lockFilename});
                    }
                }
                throw th3;
            }
        } catch (IOException e3) {
            throw new UncheckedIOException("Internal error while writing " + path, e3);
        }
    }

    private FileSystem getFileSystem(ConnectorSession connectorSession, Path path) {
        try {
            return this.hdfsEnvironment.getFileSystem(new HdfsEnvironment.HdfsContext(connectorSession), path);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private LockInfo writeNewLockInfo(FileSystem fileSystem, Path path, String str, String str2, String str3) throws IOException {
        String str4 = str + ".sb-lock_" + str3;
        LockFileContents lockFileContents = new LockFileContents(str2, str3, Instant.now().plus((TemporalAmount) EXPIRATION_DURATION).toEpochMilli());
        Path path2 = new Path(path, str4);
        byte[] jsonBytes = this.lockFileContentsJsonCodec.toJsonBytes(lockFileContents);
        FSDataOutputStream create = fileSystem.create(path2, false);
        try {
            create.write(jsonBytes);
            if (create != null) {
                create.close();
            }
            return new LockInfo(str4, lockFileContents);
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void deleteLock(FileSystem fileSystem, Path path, LockInfo lockInfo) throws IOException {
        fileSystem.delete(new Path(path, lockInfo.getLockFilename()), false);
    }

    private List<LockInfo> listLockInfos(FileSystem fileSystem, Path path) throws IOException {
        RemoteIterator listFiles = fileSystem.listFiles(path, false);
        ImmutableList.Builder builder = ImmutableList.builder();
        while (listFiles.hasNext()) {
            LocatedFileStatus locatedFileStatus = (LocatedFileStatus) listFiles.next();
            if (LOCK_FILENAME_PATTERN.matcher(locatedFileStatus.getPath().getName()).matches()) {
                Optional<LockInfo> parseLockFile = parseLockFile(fileSystem, locatedFileStatus.getPath());
                Objects.requireNonNull(builder);
                parseLockFile.ifPresent((v1) -> {
                    r1.add(v1);
                });
            }
        }
        return builder.build();
    }

    private Optional<LockInfo> parseLockFile(FileSystem fileSystem, Path path) throws IOException {
        try {
            FSDataInputStream open = fileSystem.open(path);
            try {
                Optional<LockInfo> of = Optional.of(new LockInfo(path.getName(), (LockFileContents) this.lockFileContentsJsonCodec.fromJson(open.readAllBytes())));
                if (open != null) {
                    open.close();
                }
                return of;
            } catch (Throwable th) {
                if (open != null) {
                    try {
                        open.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (IOException e) {
            if (e.getMessage().contains("The specified key does not exist.")) {
                return Optional.empty();
            }
            throw e;
        } catch (IllegalArgumentException e2) {
            String str = null;
            if (0 != 0) {
                str = Base64.getEncoder().encodeToString(null);
            }
            LOG.warn(e2, "Could not parse lock file: %s; contents=%s", new Object[]{path, str});
            return Optional.empty();
        }
    }

    public static String parseEntryFilename(String str) {
        Matcher matcher = LOCK_FILENAME_PATTERN.matcher(str);
        if (matcher.matches()) {
            return matcher.group(1);
        }
        throw new IllegalArgumentException("Lock filename " + str + " does not match expected pattern");
    }
}
