package herddb.cluster;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import herddb.backup.BackupFileConstants;
import herddb.core.HerdDBInternalException;
import herddb.core.MemoryManager;
import herddb.core.PostCheckpointAction;
import herddb.core.RecordSetFactory;
import herddb.file.FileRecordSetFactory;
import herddb.index.KeyToPageIndex;
import herddb.index.blink.BLinkKeyToPageIndex;
import herddb.log.LogSequenceNumber;
import herddb.model.Index;
import herddb.model.Record;
import herddb.model.Table;
import herddb.model.Transaction;
import herddb.server.ServerConfiguration;
import herddb.storage.DataPageDoesNotExistException;
import herddb.storage.DataStorageManager;
import herddb.storage.DataStorageManagerException;
import herddb.storage.FullTableScanConsumer;
import herddb.storage.IndexStatus;
import herddb.storage.TableStatus;
import herddb.utils.ByteArrayCursor;
import herddb.utils.Bytes;
import herddb.utils.ExtendedDataInputStream;
import herddb.utils.ExtendedDataOutputStream;
import herddb.utils.FileUtils;
import herddb.utils.SimpleByteArrayInputStream;
import herddb.utils.SystemInstrumentation;
import herddb.utils.VisibleByteArrayOutputStream;
import herddb.utils.XXHash64Utils;
import io.netty.util.Recycler;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.apache.bookkeeper.bookie.BookKeeperServerStats;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

/* loaded from: input_file:herddb/cluster/BookKeeperDataStorageManager.class */
public class BookKeeperDataStorageManager extends DataStorageManager {
    private final Path tmpDirectory;
    private final int swapThreshold;
    private final Counter zkWrites;
    private final Counter zkReads;
    private final Counter zkGetChildren;
    private final OpStatsLogger dataPageReads;
    private final OpStatsLogger dataPageWrites;
    private final OpStatsLogger indexPageReads;
    private final OpStatsLogger indexPageWrites;
    private final ZookeeperMetadataStorageManager zk;
    private final BookkeeperCommitLogManager bk;
    private final String nodeId;
    private final ConcurrentHashMap<String, TableSpacePagesMapping> tableSpaceMappings;
    private final String rootZkNode;
    private final String baseZkNode;
    public static final String FILEEXTENSION_PAGE = ".page";
    public static final String EXTENSION_TABLEORINDExCHECKPOINTINFOFILE = ".checkpoint";
    private static final Logger LOGGER = Logger.getLogger(BookKeeperDataStorageManager.class.getName());
    private static final ObjectMapper MAPPER = new ObjectMapper();
    private static final Recycler<RecyclableByteArrayOutputStream> WRITE_BUFFERS_RECYCLER = new Recycler<RecyclableByteArrayOutputStream>() { // from class: herddb.cluster.BookKeeperDataStorageManager.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.netty.util.Recycler
        public RecyclableByteArrayOutputStream newObject(Recycler.Handle<RecyclableByteArrayOutputStream> handle) {
            return new RecyclableByteArrayOutputStream(handle);
        }
    };

    /* loaded from: input_file:herddb/cluster/BookKeeperDataStorageManager$DeleteZNodeAction.class */
    private class DeleteZNodeAction extends PostCheckpointAction {
        private final String znode;

        public DeleteZNodeAction(String str, String str2, String str3) {
            super(str, str2);
            this.znode = str3;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                BookKeeperDataStorageManager.LOGGER.log(Level.FINE, this.description);
                BookKeeperDataStorageManager.this.zk.ensureZooKeeper().delete(this.znode, -1);
            } catch (IOException | KeeperException e) {
                BookKeeperDataStorageManager.LOGGER.log(Level.SEVERE, "Could not delete znode " + this.znode + BookKeeperConstants.COLON + e, e);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                BookKeeperDataStorageManager.LOGGER.log(Level.SEVERE, "Could not delete znode " + this.znode + BookKeeperConstants.COLON + e2, (Throwable) e2);
            }
        }
    }

    /* loaded from: input_file:herddb/cluster/BookKeeperDataStorageManager$DropLedgerForIndexAction.class */
    private class DropLedgerForIndexAction extends PostCheckpointAction {
        private final String tableSpace;
        private final long ledgerId;
        private final long pageId;

        public DropLedgerForIndexAction(String str, String str2, String str3, long j, long j2) {
            super(str2, str3);
            this.tableSpace = str;
            this.pageId = j;
            this.ledgerId = j2;
        }

        @Override // java.lang.Runnable
        public void run() {
            BookKeeperDataStorageManager.this.dropLedgerForIndex(this.tableSpace, this.tableName, this.pageId, this.ledgerId, this.description);
        }
    }

    /* loaded from: input_file:herddb/cluster/BookKeeperDataStorageManager$DropLedgerForTableAction.class */
    private class DropLedgerForTableAction extends PostCheckpointAction {
        private final String tableSpace;
        private final long ledgerId;
        private final long pageId;

        public DropLedgerForTableAction(String str, String str2, String str3, long j, long j2) {
            super(str2, str3);
            this.tableSpace = str;
            this.pageId = j;
            this.ledgerId = j2;
        }

        @Override // java.lang.Runnable
        public void run() {
            BookKeeperDataStorageManager.this.dropLedgerForTable(this.tableSpace, this.tableName, this.pageId, this.ledgerId, this.description);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:herddb/cluster/BookKeeperDataStorageManager$PagesMapping.class */
    public static final class PagesMapping {
        ConcurrentHashMap<Long, Long> pages;
        ConcurrentSkipListSet<Long> oldLedgers;

        private PagesMapping() {
            this.pages = new ConcurrentHashMap<>();
            this.oldLedgers = new ConcurrentSkipListSet<>();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Long getLedgerIdForPage(Long l) {
            return this.pages.get(l);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void removePageId(long j) {
            this.pages.remove(Long.valueOf(j));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void writePageId(long j, long j2) {
            Long put = this.pages.put(Long.valueOf(j), Long.valueOf(j2));
            if (put != null) {
                this.oldLedgers.add(put);
            }
        }

        public ConcurrentHashMap<Long, Long> getPages() {
            return this.pages;
        }

        public void setPages(ConcurrentHashMap<Long, Long> concurrentHashMap) {
            this.pages = concurrentHashMap;
        }

        public ConcurrentSkipListSet<Long> getOldLedgers() {
            return this.oldLedgers;
        }

        public void setOldLedgers(ConcurrentSkipListSet<Long> concurrentSkipListSet) {
            this.oldLedgers = concurrentSkipListSet;
        }

        public String toString() {
            return "PagesMapping{pages=" + this.pages + ", oldLedgers=" + this.oldLedgers + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:herddb/cluster/BookKeeperDataStorageManager$RecyclableByteArrayOutputStream.class */
    public static class RecyclableByteArrayOutputStream extends VisibleByteArrayOutputStream {
        private static final int DEFAULT_INITIAL_SIZE = 1048576;
        private final Recycler.Handle<RecyclableByteArrayOutputStream> handle;
        private boolean closed;

        RecyclableByteArrayOutputStream(Recycler.Handle<RecyclableByteArrayOutputStream> handle) {
            super(DEFAULT_INITIAL_SIZE);
            this.handle = handle;
        }

        @Override // herddb.utils.VisibleByteArrayOutputStream, java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (this.closed) {
                return;
            }
            super.close();
            this.handle.recycle(this);
            this.closed = true;
        }
    }

    /* loaded from: input_file:herddb/cluster/BookKeeperDataStorageManager$TableSpacePagesMapping.class */
    public static final class TableSpacePagesMapping {
        private ConcurrentHashMap<String, PagesMapping> tables = new ConcurrentHashMap<>();
        private ConcurrentHashMap<String, PagesMapping> indexes = new ConcurrentHashMap<>();

        /* JADX INFO: Access modifiers changed from: private */
        public PagesMapping getTablePagesMapping(String str) {
            return this.tables.computeIfAbsent(str, str2 -> {
                return new PagesMapping();
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public PagesMapping getIndexPagesMapping(String str) {
            return this.indexes.computeIfAbsent(str, str2 -> {
                return new PagesMapping();
            });
        }

        public ConcurrentHashMap<String, PagesMapping> getTableMappings() {
            return this.tables;
        }

        public void setTableMappings(ConcurrentHashMap<String, PagesMapping> concurrentHashMap) {
            this.tables = concurrentHashMap;
        }

        public ConcurrentHashMap<String, PagesMapping> getIndexMappings() {
            return this.indexes;
        }

        public void setIndexMappings(ConcurrentHashMap<String, PagesMapping> concurrentHashMap) {
            this.indexes = concurrentHashMap;
        }

        public String toString() {
            return "TableSpacePagesMapping{tableMappings=" + this.tables + ", indexMappings=" + this.indexes + '}';
        }
    }

    private TableSpacePagesMapping getTableSpacePagesMapping(String str) {
        return this.tableSpaceMappings.computeIfAbsent(str, str2 -> {
            return new TableSpacePagesMapping();
        });
    }

    private void persistTableSpaceMapping(String str) {
        try {
            TableSpacePagesMapping tableSpacePagesMapping = getTableSpacePagesMapping(str);
            byte[] writeValueAsBytes = MAPPER.writeValueAsBytes(tableSpacePagesMapping);
            String tableSpaceMappingZNode = getTableSpaceMappingZNode(str);
            LOGGER.log(Level.INFO, "persistTableSpaceMapping " + str + ", " + tableSpacePagesMapping + " to " + tableSpaceMappingZNode + " JSON " + new String(writeValueAsBytes, StandardCharsets.UTF_8));
            writeZNode(tableSpaceMappingZNode, writeValueAsBytes, null);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    private void loadTableSpacesAtBoot() throws DataStorageManagerException {
        List<String> zkGetChildren = zkGetChildren(this.baseZkNode, false);
        LOGGER.log(Level.INFO, "tableSpaces to load: {0}", zkGetChildren);
        Iterator<String> it = zkGetChildren.iterator();
        while (it.hasNext()) {
            loadTableSpaceMapping(it.next());
        }
    }

    private void loadTableSpaceMapping(String str) throws DataStorageManagerException {
        String tableSpaceMappingZNode = getTableSpaceMappingZNode(str);
        byte[] readZNode = readZNode(tableSpaceMappingZNode, new Stat());
        if (readZNode == null) {
            LOGGER.log(Level.INFO, "loadTableSpaceMapping " + str + ", from " + tableSpaceMappingZNode + " was not found");
            this.tableSpaceMappings.put(str, new TableSpacePagesMapping());
            return;
        }
        try {
            TableSpacePagesMapping tableSpacePagesMapping = (TableSpacePagesMapping) MAPPER.readValue(readZNode, TableSpacePagesMapping.class);
            LOGGER.log(Level.INFO, "loadTableSpaceMapping " + str + ", " + tableSpacePagesMapping + " from " + tableSpaceMappingZNode + " JSON " + new String(readZNode, StandardCharsets.UTF_8));
            this.tableSpaceMappings.put(str, tableSpacePagesMapping);
        } catch (IOException e) {
            throw new DataStorageManagerException(e);
        }
    }

    private String getTableSpaceZNode(String str) {
        return this.baseZkNode + "/" + str;
    }

    public BookKeeperDataStorageManager(String str, Path path, ZookeeperMetadataStorageManager zookeeperMetadataStorageManager, BookkeeperCommitLogManager bookkeeperCommitLogManager) {
        this(str, path.resolve(ServerConfiguration.PROPERTY_TMPDIR_DEFAULT), 10000, zookeeperMetadataStorageManager, bookkeeperCommitLogManager, new NullStatsLogger());
    }

    public BookKeeperDataStorageManager(String str, Path path, int i, ZookeeperMetadataStorageManager zookeeperMetadataStorageManager, BookkeeperCommitLogManager bookkeeperCommitLogManager, StatsLogger statsLogger) {
        this.tableSpaceMappings = new ConcurrentHashMap<>();
        this.nodeId = str;
        this.tmpDirectory = path;
        this.swapThreshold = i;
        StatsLogger scope = statsLogger.scope("bkdatastore");
        this.dataPageReads = scope.getOpStatsLogger("data_pagereads");
        this.dataPageWrites = scope.getOpStatsLogger("data_pagewrites");
        this.indexPageReads = scope.getOpStatsLogger("index_pagereads");
        this.indexPageWrites = scope.getOpStatsLogger("index_pagewrites");
        this.zkReads = scope.getCounter("zkReads");
        this.zkWrites = scope.getCounter("zkWrites");
        this.zkGetChildren = scope.getCounter("zkGetChildren");
        this.zk = zookeeperMetadataStorageManager;
        this.bk = bookkeeperCommitLogManager;
        this.rootZkNode = zookeeperMetadataStorageManager.getBasePath() + "/data";
        this.baseZkNode = this.rootZkNode + "/" + str;
    }

    @Override // herddb.storage.DataStorageManager
    public void start() throws DataStorageManagerException {
        try {
            LOGGER.log(Level.INFO, "ensuring directory {0}", this.tmpDirectory.toAbsolutePath().toString());
            Files.createDirectories(this.tmpDirectory, new FileAttribute[0]);
            LOGGER.log(Level.INFO, "preparing tmp directory {0}", this.tmpDirectory.toAbsolutePath().toString());
            FileUtils.cleanDirectory(this.tmpDirectory);
            Files.createDirectories(this.tmpDirectory, new FileAttribute[0]);
            LOGGER.log(Level.INFO, "preparing root znode " + this.baseZkNode);
            ensureZNodeDirectory(this.rootZkNode);
            ensureZNodeDirectory(this.baseZkNode);
            loadTableSpacesAtBoot();
        } catch (IOException e) {
            throw new DataStorageManagerException(e);
        }
    }

    @Override // herddb.storage.DataStorageManager, java.lang.AutoCloseable
    public void close() throws DataStorageManagerException {
        LOGGER.log(Level.INFO, "cleaning tmp directory {0}", this.tmpDirectory.toAbsolutePath().toString());
        try {
            FileUtils.cleanDirectory(this.tmpDirectory);
        } catch (IOException e) {
            LOGGER.log(Level.SEVERE, "Cannot clean tmp directory", (Throwable) e);
        }
    }

    @Override // herddb.storage.DataStorageManager
    public void eraseTablespaceData(String str) throws DataStorageManagerException {
        SystemInstrumentation.instrumentationPoint("eraseTablespaceData", str);
        String tableSpaceZNode = getTableSpaceZNode(str);
        LOGGER.log(Level.INFO, "erasing tablespace " + str + " znode {0}", tableSpaceZNode);
        try {
            ZKUtil.deleteRecursive(this.zk.ensureZooKeeper(), tableSpaceZNode);
        } catch (IOException | KeeperException e) {
            LOGGER.log(Level.SEVERE, "Cannot clean znode for tablespace " + str, e);
            throw new DataStorageManagerException(e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            LOGGER.log(Level.SEVERE, "Cannot clean znode for tablespace " + str, (Throwable) e2);
            throw new DataStorageManagerException(e2);
        }
    }

    private static boolean isTablespaceCheckPointInfoFile(String str) {
        return str.startsWith("checkpoint.") && str.endsWith(".checkpoint");
    }

    private String getTableSpaceMappingZNode(String str) {
        return getTableSpaceZNode(str) + "/pagemap";
    }

    private String getTablespaceCheckPointInfoFile(String str, LogSequenceNumber logSequenceNumber) {
        return getTableSpaceZNode(str) + "/checkpoint." + logSequenceNumber.ledgerId + DefaultExpressionEngine.DEFAULT_PROPERTY_DELIMITER + logSequenceNumber.offset + ".checkpoint";
    }

    private String getTablespaceTablesMetadataFile(String str, LogSequenceNumber logSequenceNumber) {
        return getTableSpaceZNode(str) + "/tables." + logSequenceNumber.ledgerId + DefaultExpressionEngine.DEFAULT_PROPERTY_DELIMITER + logSequenceNumber.offset + ".tablesmetadata";
    }

    private static boolean isTablespaceTablesMetadataFile(String str) {
        String filename = getFilename(str);
        return filename != null && filename.startsWith("tables.") && filename.endsWith(".tablesmetadata");
    }

    private static boolean isTablespaceIndexesMetadataFile(String str) {
        String filename = getFilename(str);
        return filename != null && filename.startsWith("indexes.") && filename.endsWith(".tablesmetadata");
    }

    private String getTablespaceIndexesMetadataFile(String str, LogSequenceNumber logSequenceNumber) {
        return getTableSpaceZNode(str) + "/indexes." + logSequenceNumber.ledgerId + DefaultExpressionEngine.DEFAULT_PROPERTY_DELIMITER + logSequenceNumber.offset + ".tablesmetadata";
    }

    private String getTablespaceTransactionsFile(String str, LogSequenceNumber logSequenceNumber) {
        return getTableSpaceZNode(str) + "/transactions." + logSequenceNumber.ledgerId + DefaultExpressionEngine.DEFAULT_PROPERTY_DELIMITER + logSequenceNumber.offset + ".tx";
    }

    private static boolean isTransactionsFile(String str) {
        String filename = getFilename(str);
        return filename != null && filename.startsWith("transactions.") && filename.endsWith(".tx");
    }

    private String getTableDirectory(String str, String str2) {
        return getTableSpaceZNode(str) + "/" + str2 + ".table";
    }

    private String getIndexDirectory(String str, String str2) {
        return getTableSpaceZNode(str) + "/" + str2 + ".index";
    }

    private static String getCheckPointsFile(String str, LogSequenceNumber logSequenceNumber) {
        return str + "/" + logSequenceNumber.ledgerId + DefaultExpressionEngine.DEFAULT_PROPERTY_DELIMITER + logSequenceNumber.offset + ".checkpoint";
    }

    private static boolean isTableOrIndexCheckpointsFile(String str) {
        String filename = getFilename(str);
        return filename != null && filename.endsWith(".checkpoint");
    }

    @Override // herddb.storage.DataStorageManager
    public void initIndex(String str, String str2) throws DataStorageManagerException {
        String indexDirectory = getIndexDirectory(str, str2);
        LOGGER.log(Level.FINE, "initIndex {0} {1} at {2}", new Object[]{str, str2, indexDirectory});
        try {
            this.zk.ensureZooKeeper().create(indexDirectory, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch (IOException | KeeperException e) {
            throw new DataStorageManagerException(e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new DataStorageManagerException(e2);
        } catch (KeeperException.NodeExistsException e3) {
        }
    }

    @Override // herddb.storage.DataStorageManager
    public void initTable(String str, String str2) throws DataStorageManagerException {
        String tableDirectory = getTableDirectory(str, str2);
        LOGGER.log(Level.FINE, "initTable {0} {1} at {2}", new Object[]{str, str2, tableDirectory});
        try {
            this.zk.ensureZooKeeper().create(tableDirectory, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch (IOException | KeeperException e) {
            throw new DataStorageManagerException(e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new DataStorageManagerException(e2);
        } catch (KeeperException.NodeExistsException e3) {
        }
    }

    @Override // herddb.storage.DataStorageManager
    public List<Record> readPage(String str, String str2, Long l) throws DataStorageManagerException {
        long currentTimeMillis = System.currentTimeMillis();
        Long ledgerIdForPage = getTableSpacePagesMapping(str).getTablePagesMapping(str2).getLedgerIdForPage(l);
        if (ledgerIdForPage == null) {
            throw new DataPageDoesNotExistException("No such page: " + str + "_" + str2 + DefaultExpressionEngine.DEFAULT_PROPERTY_DELIMITER + l);
        }
        try {
            ReadHandle readHandle = (ReadHandle) FutureUtils.result(this.bk.getBookKeeper().newOpenLedgerOp().withLedgerId(ledgerIdForPage.longValue()).withPassword(new byte[0]).execute(), BKException.HANDLER);
            try {
                LedgerEntries readUnconfirmed = readHandle.readUnconfirmed(0L, 0L);
                try {
                    byte[] entryBytes = readUnconfirmed.getEntry(0L).getEntryBytes();
                    if (readUnconfirmed != null) {
                        readUnconfirmed.close();
                    }
                    List<Record> rawReadDataPage = rawReadDataPage(entryBytes);
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    LOGGER.log(Level.FINE, "readPage {0}.{1} {2} ms", new Object[]{str, str2, currentTimeMillis2 + ""});
                    this.dataPageReads.registerSuccessfulEvent(currentTimeMillis2, TimeUnit.MILLISECONDS);
                    if (readHandle != null) {
                        readHandle.close();
                    }
                    return rawReadDataPage;
                } catch (Throwable th) {
                    if (readUnconfirmed != null) {
                        try {
                            readUnconfirmed.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (readHandle != null) {
                    try {
                        readHandle.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (IOException e) {
            throw new DataStorageManagerException(e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new DataStorageManagerException(e2);
        } catch (BKException.BKNoSuchLedgerExistsException e3) {
            throw new DataStorageManagerException(e3);
        } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException e4) {
            throw new DataStorageManagerException(e4);
        } catch (org.apache.bookkeeper.client.api.BKException e5) {
            throw new DataStorageManagerException(e5);
        }
    }

    private static List<Record> rawReadDataPage(byte[] bArr) throws IOException, DataStorageManagerException {
        ByteArrayCursor wrap = ByteArrayCursor.wrap(bArr);
        try {
            long readVLong = wrap.readVLong();
            long readVLong2 = wrap.readVLong();
            if (readVLong != 1 || readVLong2 != 0) {
                throw new DataStorageManagerException("corrupted data");
            }
            int readInt = wrap.readInt();
            ArrayList arrayList = new ArrayList(readInt);
            for (int i = 0; i < readInt; i++) {
                arrayList.add(new Record(wrap.readBytesNoCopy(), wrap.readBytesNoCopy()));
            }
            int position = wrap.getPosition();
            long readLong = wrap.readLong();
            long hash = XXHash64Utils.hash(bArr, 0, position);
            if (hash != readLong) {
                throw new DataStorageManagerException("Corrupted datafile. Bad hash " + readLong + " <> " + hash);
            }
            if (wrap != null) {
                wrap.close();
            }
            return arrayList;
        } catch (Throwable th) {
            if (wrap != null) {
                try {
                    wrap.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static <X> X readIndexPage(byte[] bArr, DataStorageManager.DataReader<X> dataReader) throws IOException, DataStorageManagerException {
        ByteArrayCursor wrap = ByteArrayCursor.wrap(bArr);
        try {
            long readVLong = wrap.readVLong();
            long readVLong2 = wrap.readVLong();
            if (readVLong != 1 || readVLong2 != 0) {
                throw new DataStorageManagerException("corrupted data file");
            }
            X read = dataReader.read(wrap);
            int position = wrap.getPosition();
            long readLong = wrap.readLong();
            long hash = XXHash64Utils.hash(bArr, 0, position);
            if (hash != readLong) {
                throw new DataStorageManagerException("Corrupted datafile . Bad hash " + readLong + " <> " + hash);
            }
            if (wrap != null) {
                wrap.close();
            }
            return read;
        } catch (Throwable th) {
            if (wrap != null) {
                try {
                    wrap.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // herddb.storage.DataStorageManager
    public <X> X readIndexPage(String str, String str2, Long l, DataStorageManager.DataReader<X> dataReader) throws DataStorageManagerException {
        long currentTimeMillis = System.currentTimeMillis();
        Long ledgerIdForPage = getTableSpacePagesMapping(str).getIndexPagesMapping(str2).getLedgerIdForPage(l);
        if (ledgerIdForPage == null) {
            throw new DataPageDoesNotExistException("No such page for index : " + str + "_" + str2 + DefaultExpressionEngine.DEFAULT_PROPERTY_DELIMITER + l);
        }
        try {
            ReadHandle readHandle = (ReadHandle) FutureUtils.result(this.bk.getBookKeeper().newOpenLedgerOp().withLedgerId(ledgerIdForPage.longValue()).withPassword(new byte[0]).execute(), BKException.HANDLER);
            try {
                LedgerEntries readUnconfirmed = readHandle.readUnconfirmed(0L, 0L);
                try {
                    byte[] entryBytes = readUnconfirmed.getEntry(0L).getEntryBytes();
                    if (readUnconfirmed != null) {
                        readUnconfirmed.close();
                    }
                    X x = (X) readIndexPage(entryBytes, dataReader);
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    LOGGER.log(Level.FINE, "readIndexPage {0}.{1} {2} ms", new Object[]{str, str2, currentTimeMillis2 + ""});
                    this.indexPageReads.registerSuccessfulEvent(currentTimeMillis2, TimeUnit.MILLISECONDS);
                    if (readHandle != null) {
                        readHandle.close();
                    }
                    return x;
                } catch (Throwable th) {
                    if (readUnconfirmed != null) {
                        try {
                            readUnconfirmed.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (readHandle != null) {
                    try {
                        readHandle.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (IOException e) {
            throw new DataStorageManagerException(e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new DataStorageManagerException(e2);
        } catch (BKException.BKNoSuchLedgerExistsException e3) {
            throw new DataStorageManagerException(e3);
        } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException e4) {
            throw new DataStorageManagerException(e4);
        } catch (org.apache.bookkeeper.client.api.BKException e5) {
            throw new DataStorageManagerException(e5);
        }
    }

    @Override // herddb.storage.DataStorageManager
    public void fullTableScan(String str, String str2, FullTableScanConsumer fullTableScanConsumer) throws DataStorageManagerException {
        try {
            fullTableScan(str, str2, getLatestTableStatus(str, str2), fullTableScanConsumer);
        } catch (HerdDBInternalException e) {
            throw new DataStorageManagerException(e);
        }
    }

    @Override // herddb.storage.DataStorageManager
    public void fullTableScan(String str, String str2, LogSequenceNumber logSequenceNumber, FullTableScanConsumer fullTableScanConsumer) throws DataStorageManagerException {
        try {
            fullTableScan(str, str2, getTableStatus(str, str2, logSequenceNumber), fullTableScanConsumer);
        } catch (HerdDBInternalException e) {
            throw new DataStorageManagerException(e);
        }
    }

    private void fullTableScan(String str, String str2, TableStatus tableStatus, FullTableScanConsumer fullTableScanConsumer) {
        LOGGER.log(Level.INFO, "fullTableScan table {0}.{1}, status: {2}", new Object[]{str, str2, tableStatus});
        fullTableScanConsumer.acceptTableStatus(tableStatus);
        ArrayList arrayList = new ArrayList(tableStatus.activePages.keySet());
        arrayList.sort(null);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            long longValue = ((Long) it.next()).longValue();
            List<Record> readPage = readPage(str, str2, Long.valueOf(longValue));
            LOGGER.log(Level.FINE, "fullTableScan table {0}.{1}, page {2}, contains {3} records", new Object[]{str, str2, Long.valueOf(longValue), Integer.valueOf(readPage.size())});
            fullTableScanConsumer.acceptPage(longValue, readPage);
        }
        fullTableScanConsumer.endTable();
    }

    @Override // herddb.storage.DataStorageManager
    public int getActualNumberOfPages(String str, String str2) throws DataStorageManagerException {
        return getLatestTableStatus(str, str2).activePages.size();
    }

    @Override // herddb.storage.DataStorageManager
    public IndexStatus getIndexStatus(String str, String str2, LogSequenceNumber logSequenceNumber) throws DataStorageManagerException {
        String checkPointsFile = getCheckPointsFile(getIndexDirectory(str, str2), logSequenceNumber);
        checkExistsZNode(checkPointsFile, "no such index checkpoint: " + checkPointsFile);
        return readIndexStatusFromFile(checkPointsFile);
    }

    private void checkExistsZNode(String str, String str2) throws DataStorageManagerException {
        try {
            if (this.zk.ensureZooKeeper().exists(str, false) == null) {
                throw new DataStorageManagerException(str2);
            }
        } catch (IOException | KeeperException e) {
            throw new DataStorageManagerException(e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new DataStorageManagerException(e2);
        }
    }

    private byte[] readZNode(String str, Stat stat) throws DataStorageManagerException {
        try {
            this.zkReads.inc();
            return this.zk.ensureZooKeeper().getData(str, false, stat);
        } catch (IOException | KeeperException e) {
            throw new DataStorageManagerException(e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new DataStorageManagerException(e2);
        } catch (KeeperException.NoNodeException e3) {
            return null;
        }
    }

    private void writeZNode(String str, byte[] bArr, Stat stat) throws DataStorageManagerException {
        try {
            this.zkWrites.inc();
            try {
                this.zk.ensureZooKeeper().create(str, bArr, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            } catch (KeeperException.NodeExistsException e) {
                this.zk.ensureZooKeeper().setData(str, bArr, stat != null ? stat.getVersion() : -1);
            }
        } catch (IOException | KeeperException e2) {
            throw new DataStorageManagerException(e2);
        } catch (InterruptedException e3) {
            Thread.currentThread().interrupt();
            throw new DataStorageManagerException(e3);
        }
    }

    private List<String> ensureZNodeDirectoryAndReturnChildren(String str) throws DataStorageManagerException {
        try {
            try {
                if (this.zk.ensureZooKeeper().exists(str, false) != null) {
                    return zkGetChildren(str);
                }
                this.zk.ensureZooKeeper().create(str, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                return Collections.emptyList();
            } catch (IOException | KeeperException e) {
                throw new DataStorageManagerException(e);
            }
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new DataStorageManagerException(e2);
        }
    }

    private void ensureZNodeDirectory(String str) throws DataStorageManagerException {
        try {
            this.zk.ensureZooKeeper().create(str, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch (IOException | KeeperException e) {
            throw new DataStorageManagerException(e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new DataStorageManagerException(e2);
        } catch (KeeperException.NodeExistsException e3) {
        }
    }

    private List<String> zkGetChildren(String str) throws DataStorageManagerException {
        return zkGetChildren(str, true);
    }

    private List<String> zkGetChildren(String str, boolean z) throws DataStorageManagerException {
        try {
            this.zkGetChildren.inc();
            List<String> children = this.zk.ensureZooKeeper().getChildren(str, false);
            return z ? (List) children.stream().map(str2 -> {
                return str + "/" + str2;
            }).collect(Collectors.toList()) : children;
        } catch (IOException | KeeperException e) {
            throw new DataStorageManagerException(e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new DataStorageManagerException(e2);
        } catch (KeeperException.NoNodeException e3) {
            return Collections.emptyList();
        }
    }

    @Override // herddb.storage.DataStorageManager
    public TableStatus getTableStatus(String str, String str2, LogSequenceNumber logSequenceNumber) throws DataStorageManagerException {
        try {
            String checkPointsFile = getCheckPointsFile(getTableDirectory(str, str2), logSequenceNumber);
            checkExistsZNode(checkPointsFile, "no such table checkpoint: " + checkPointsFile);
            return readTableStatusFromFile(checkPointsFile);
        } catch (IOException e) {
            throw new DataStorageManagerException(e);
        }
    }

    @Override // herddb.storage.DataStorageManager
    public TableStatus getLatestTableStatus(String str, String str2) throws DataStorageManagerException {
        try {
            String lastTableCheckpointFile = getLastTableCheckpointFile(str, str2);
            return lastTableCheckpointFile == null ? new TableStatus(str2, LogSequenceNumber.START_OF_TIME, Bytes.longToByteArray(1L), 1L, Collections.emptyMap()) : readTableStatusFromFile(lastTableCheckpointFile);
        } catch (IOException | KeeperException e) {
            throw new DataStorageManagerException(e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new DataStorageManagerException(e2);
        }
    }

    public TableStatus readTableStatusFromFile(String str) throws IOException {
        return readTableStatusFromFile(readZNode(str, new Stat()), str);
    }

    public TableStatus readTableStatusFromFile(byte[] bArr, String str) throws IOException {
        if (bArr == null) {
            throw new IOException("Missing ZNode for TableStatus at " + str);
        }
        XXHash64Utils.verifyBlockWithFooter(bArr, 0, bArr.length);
        SimpleByteArrayInputStream simpleByteArrayInputStream = new SimpleByteArrayInputStream(bArr);
        try {
            ExtendedDataInputStream extendedDataInputStream = new ExtendedDataInputStream(simpleByteArrayInputStream);
            try {
                long readVLong = extendedDataInputStream.readVLong();
                long readVLong2 = extendedDataInputStream.readVLong();
                if (readVLong != 1 || readVLong2 != 0) {
                    throw new DataStorageManagerException("corrupted table status file " + str);
                }
                TableStatus deserialize = TableStatus.deserialize(extendedDataInputStream);
                extendedDataInputStream.close();
                simpleByteArrayInputStream.close();
                return deserialize;
            } finally {
            }
        } catch (Throwable th) {
            try {
                simpleByteArrayInputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private String getLastTableCheckpointFile(String str, String str2) throws IOException, KeeperException, InterruptedException {
        return getMostRecentCheckPointFile(getTableDirectory(str, str2));
    }

    private String getMostRecentCheckPointFile(String str) throws IOException, KeeperException, InterruptedException {
        String str2 = null;
        long j = -1;
        for (String str3 : ensureZNodeDirectoryAndReturnChildren(str)) {
            if (isTableOrIndexCheckpointsFile(str3)) {
                LOGGER.log(Level.INFO, "getMostRecentCheckPointFile on " + str + " -> ACCEPT " + str3);
                Stat stat = new Stat();
                this.zk.ensureZooKeeper().exists(str3, false, (AsyncCallback.StatCallback) null, (Object) stat);
                long mtime = stat.getMtime();
                if (j < 0 || j < mtime) {
                    str2 = str3;
                    j = mtime;
                }
            } else {
                LOGGER.log(Level.INFO, "getMostRecentCheckPointFile on " + str + " -> SKIP " + str3);
            }
        }
        LOGGER.log(Level.INFO, "getMostRecentCheckPointFile on " + str + " -> " + str2);
        return str2;
    }

    public IndexStatus readIndexStatusFromFile(String str) throws DataStorageManagerException {
        byte[] readZNode = readZNode(str, new Stat());
        if (readZNode == null) {
            throw new DataStorageManagerException("Missing znode for " + str + " IndexStatusFile");
        }
        return readIndexStatusFromFile(readZNode, str);
    }

    public IndexStatus readIndexStatusFromFile(byte[] bArr, String str) throws DataStorageManagerException {
        try {
            if (bArr == null) {
                throw new DataStorageManagerException("Missing znode for " + str + " IndexStatusFile");
            }
            XXHash64Utils.verifyBlockWithFooter(bArr, 0, bArr.length);
            SimpleByteArrayInputStream simpleByteArrayInputStream = new SimpleByteArrayInputStream(bArr);
            try {
                ExtendedDataInputStream extendedDataInputStream = new ExtendedDataInputStream(simpleByteArrayInputStream);
                try {
                    long readVLong = extendedDataInputStream.readVLong();
                    long readVLong2 = extendedDataInputStream.readVLong();
                    if (readVLong != 1 || readVLong2 != 0) {
                        throw new DataStorageManagerException("corrupted index status file " + str);
                    }
                    IndexStatus deserialize = IndexStatus.deserialize(extendedDataInputStream);
                    extendedDataInputStream.close();
                    simpleByteArrayInputStream.close();
                    return deserialize;
                } catch (Throwable th) {
                    try {
                        extendedDataInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (IOException e) {
            throw new DataStorageManagerException(e);
        }
    }

    @Override // herddb.storage.DataStorageManager
    public List<PostCheckpointAction> tableCheckpoint(String str, String str2, TableStatus tableStatus, boolean z) throws DataStorageManagerException {
        TableStatus readTableStatusFromFile;
        persistTableSpaceMapping(str);
        LogSequenceNumber logSequenceNumber = tableStatus.sequenceNumber;
        String tableDirectory = getTableDirectory(str, str2);
        String checkPointsFile = getCheckPointsFile(tableDirectory, logSequenceNumber);
        Stat stat = new Stat();
        try {
            if (readZNode(checkPointsFile, stat) != null && (readTableStatusFromFile = readTableStatusFromFile(checkPointsFile)) != null && readTableStatusFromFile.equals(tableStatus)) {
                LOGGER.log(Level.FINE, "tableCheckpoint " + str + ", " + str2 + ": " + tableStatus + " (pin:" + z + ") already saved on file " + checkPointsFile);
                return Collections.emptyList();
            }
            LOGGER.log(Level.FINE, "tableCheckpoint " + str + ", " + str2 + ": " + tableStatus + " (pin:" + z + ") to file " + checkPointsFile);
            try {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                try {
                    XXHash64Utils.HashingOutputStream hashingOutputStream = new XXHash64Utils.HashingOutputStream(byteArrayOutputStream);
                    try {
                        ExtendedDataOutputStream extendedDataOutputStream = new ExtendedDataOutputStream(hashingOutputStream);
                        try {
                            extendedDataOutputStream.writeVLong(1L);
                            extendedDataOutputStream.writeVLong(0L);
                            tableStatus.serialize(extendedDataOutputStream);
                            extendedDataOutputStream.writeLong(hashingOutputStream.hash());
                            extendedDataOutputStream.flush();
                            byte[] byteArray = byteArrayOutputStream.toByteArray();
                            extendedDataOutputStream.close();
                            hashingOutputStream.close();
                            byteArrayOutputStream.close();
                            writeZNode(checkPointsFile, byteArray, stat);
                            Map<Long, Integer> pinTableAndGetPages = pinTableAndGetPages(str, str2, tableStatus, z);
                            Set<LogSequenceNumber> pinTableAndGetCheckpoints = pinTableAndGetCheckpoints(str, str2, tableStatus, z);
                            long longValue = tableStatus.activePages.keySet().stream().max(Comparator.naturalOrder()).orElse(Long.MAX_VALUE).longValue();
                            ArrayList arrayList = new ArrayList();
                            PagesMapping tablePagesMapping = getTableSpacePagesMapping(str).getTablePagesMapping(str2);
                            for (Map.Entry<Long, Long> entry : tablePagesMapping.pages.entrySet()) {
                                long longValue2 = entry.getKey().longValue();
                                long longValue3 = entry.getValue().longValue();
                                LOGGER.log(Level.FINEST, "checkpoint pageId {0} ledgerId {1}", new Object[]{Long.valueOf(longValue2), Long.valueOf(longValue3)});
                                if (longValue2 > 0 && !pinTableAndGetPages.containsKey(Long.valueOf(longValue2)) && !tableStatus.activePages.containsKey(Long.valueOf(longValue2)) && longValue2 < longValue) {
                                    LOGGER.log(Level.FINEST, "checkpoint ledger " + longValue3 + " pageId " + longValue2 + ". will be deleted after checkpoint end");
                                    arrayList.add(new DropLedgerForTableAction(str, str2, "delete page " + longValue2 + " ledgerId " + longValue3, longValue2, longValue3));
                                }
                            }
                            Iterator<Long> it = tablePagesMapping.oldLedgers.iterator();
                            while (it.hasNext()) {
                                Long next = it.next();
                                LOGGER.log(Level.FINEST, "checkpoint ledger " + next + " without page. will be deleted after checkpoint end");
                                arrayList.add(new DropLedgerForTableAction(str, str2, "delete unused ledgerId " + next, Long.MAX_VALUE, next.longValue()));
                            }
                            try {
                                for (String str3 : zkGetChildren(tableDirectory)) {
                                    if (isTableOrIndexCheckpointsFile(str3) && !str3.equals(checkPointsFile)) {
                                        TableStatus readTableStatusFromFile2 = readTableStatusFromFile(str3);
                                        if (logSequenceNumber.after(readTableStatusFromFile2.sequenceNumber) && !pinTableAndGetCheckpoints.contains(readTableStatusFromFile2.sequenceNumber)) {
                                            LOGGER.log(Level.FINEST, "checkpoint metadata file " + str3 + ". will be deleted after checkpoint end");
                                            arrayList.add(new DeleteZNodeAction(str2, "delete checkpoint metadata file " + str3, str3));
                                        }
                                    }
                                }
                            } catch (IOException e) {
                                LOGGER.log(Level.SEVERE, "Could not list table dir " + tableDirectory, (Throwable) e);
                            }
                            return arrayList;
                        } catch (Throwable th) {
                            try {
                                extendedDataOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                            throw th;
                        }
                    } catch (Throwable th3) {
                        try {
                            hashingOutputStream.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                        throw th3;
                    }
                } finally {
                }
            } catch (IOException e2) {
                throw new DataStorageManagerException(e2);
            }
        } catch (IOException e3) {
            throw new DataStorageManagerException(e3);
        }
    }

    @Override // herddb.storage.DataStorageManager
    public List<PostCheckpointAction> indexCheckpoint(String str, String str2, IndexStatus indexStatus, boolean z) throws DataStorageManagerException {
        IndexStatus readIndexStatusFromFile;
        String indexDirectory = getIndexDirectory(str, str2);
        LogSequenceNumber logSequenceNumber = indexStatus.sequenceNumber;
        String checkPointsFile = getCheckPointsFile(indexDirectory, logSequenceNumber);
        Stat stat = new Stat();
        byte[] readZNode = readZNode(checkPointsFile, stat);
        if (readZNode != null && (readIndexStatusFromFile = readIndexStatusFromFile(readZNode, checkPointsFile)) != null && readIndexStatusFromFile.equals(indexStatus)) {
            LOGGER.log(Level.INFO, "indexCheckpoint " + str + ", " + str2 + ": " + indexStatus + " already saved on" + checkPointsFile);
            return Collections.emptyList();
        }
        LOGGER.log(Level.FINE, "indexCheckpoint " + str + ", " + str2 + ": " + indexStatus + " to file " + checkPointsFile);
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                XXHash64Utils.HashingOutputStream hashingOutputStream = new XXHash64Utils.HashingOutputStream(byteArrayOutputStream);
                try {
                    ExtendedDataOutputStream extendedDataOutputStream = new ExtendedDataOutputStream(hashingOutputStream);
                    try {
                        extendedDataOutputStream.writeVLong(1L);
                        extendedDataOutputStream.writeVLong(0L);
                        indexStatus.serialize(extendedDataOutputStream);
                        extendedDataOutputStream.writeLong(hashingOutputStream.hash());
                        extendedDataOutputStream.flush();
                        byte[] byteArray = byteArrayOutputStream.toByteArray();
                        extendedDataOutputStream.close();
                        hashingOutputStream.close();
                        byteArrayOutputStream.close();
                        writeZNode(checkPointsFile, byteArray, stat);
                        Map<Long, Integer> pinIndexAndGetPages = pinIndexAndGetPages(str, str2, indexStatus, z);
                        Set<LogSequenceNumber> pinIndexAndGetCheckpoints = pinIndexAndGetCheckpoints(str, str2, indexStatus, z);
                        long longValue = indexStatus.activePages.stream().max(Comparator.naturalOrder()).orElse(Long.MAX_VALUE).longValue();
                        ArrayList arrayList = new ArrayList();
                        PagesMapping indexPagesMapping = getTableSpacePagesMapping(str).getIndexPagesMapping(str2);
                        for (Map.Entry<Long, Long> entry : indexPagesMapping.pages.entrySet()) {
                            long longValue2 = entry.getKey().longValue();
                            long longValue3 = entry.getValue().longValue();
                            LOGGER.log(Level.FINEST, "checkpoint pageId {0} ledgerId {1}", new Object[]{Long.valueOf(longValue2), Long.valueOf(longValue3)});
                            if (longValue2 > 0 && !pinIndexAndGetPages.containsKey(Long.valueOf(longValue2)) && !indexStatus.activePages.contains(Long.valueOf(longValue2)) && longValue2 < longValue) {
                                LOGGER.log(Level.FINEST, "checkpoint ledger " + longValue3 + " pageId " + longValue2 + ". will be deleted after checkpoint end");
                                arrayList.add(new DropLedgerForIndexAction(str, str2, "delete index page " + longValue2 + " ledgerId " + longValue3, longValue2, longValue3));
                            }
                        }
                        Iterator<Long> it = indexPagesMapping.oldLedgers.iterator();
                        while (it.hasNext()) {
                            Long next = it.next();
                            LOGGER.log(Level.FINEST, "checkpoint ledger " + next + " without page. will be deleted after checkpoint end");
                            arrayList.add(new DropLedgerForIndexAction(str, str2, "delete unused ledgerId " + next, Long.MAX_VALUE, next.longValue()));
                        }
                        for (String str3 : zkGetChildren(indexDirectory)) {
                            if (isTableOrIndexCheckpointsFile(str3) && !str3.equals(checkPointsFile)) {
                                IndexStatus readIndexStatusFromFile2 = readIndexStatusFromFile(str3);
                                if (logSequenceNumber.after(readIndexStatusFromFile2.sequenceNumber) && !pinIndexAndGetCheckpoints.contains(readIndexStatusFromFile2.sequenceNumber)) {
                                    LOGGER.log(Level.FINEST, "checkpoint metadata file " + str3 + ". will be deleted after checkpoint end");
                                    arrayList.add(new DeleteZNodeAction(str2, "delete checkpoint metadata file " + str3, str3));
                                }
                            }
                        }
                        return arrayList;
                    } catch (Throwable th) {
                        try {
                            extendedDataOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    try {
                        hashingOutputStream.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                    throw th3;
                }
            } finally {
            }
        } catch (IOException e) {
            throw new DataStorageManagerException(e);
        }
    }

    private static String getFilename(String str) {
        return str.substring(str.lastIndexOf("/") + 1);
    }

    private static long getPageId(String str) {
        String filename = getFilename(str);
        if (!filename.endsWith(".page")) {
            return -1L;
        }
        try {
            return Long.parseLong(filename.substring(0, filename.length() - ".page".length()));
        } catch (NumberFormatException e) {
            return -1L;
        }
    }

    @Override // herddb.storage.DataStorageManager
    public void cleanupAfterTableBoot(String str, String str2, Set<Long> set) throws DataStorageManagerException {
        for (Map.Entry<Long, Long> entry : getTableSpacePagesMapping(str).getTablePagesMapping(str2).pages.entrySet()) {
            long longValue = entry.getKey().longValue();
            long longValue2 = entry.getValue().longValue();
            LOGGER.log(Level.FINER, "cleanupAfterTableBoot pageId " + longValue + " ledger id " + longValue2);
            if (longValue > 0 && !set.contains(Long.valueOf(longValue))) {
                LOGGER.log(Level.INFO, "cleanupAfterTableBoot pageId " + longValue + " ledger id " + longValue2 + ". will be deleted");
                dropLedgerForTable(str, str2, longValue, longValue2, "cleanupAfterBoot " + str + DefaultExpressionEngine.DEFAULT_PROPERTY_DELIMITER + str2 + " pageId " + longValue);
            }
        }
    }

    private static long writePage(Collection<Record> collection, VisibleByteArrayOutputStream visibleByteArrayOutputStream) throws IOException {
        ExtendedDataOutputStream extendedDataOutputStream = new ExtendedDataOutputStream(visibleByteArrayOutputStream);
        try {
            extendedDataOutputStream.writeVLong(1L);
            extendedDataOutputStream.writeVLong(0L);
            extendedDataOutputStream.writeInt(collection.size());
            for (Record record : collection) {
                extendedDataOutputStream.writeArray(record.key);
                extendedDataOutputStream.writeArray(record.value);
            }
            extendedDataOutputStream.flush();
            extendedDataOutputStream.writeLong(XXHash64Utils.hash(visibleByteArrayOutputStream.getBuffer(), 0, visibleByteArrayOutputStream.size()));
            extendedDataOutputStream.flush();
            long size = visibleByteArrayOutputStream.size();
            extendedDataOutputStream.close();
            return size;
        } catch (Throwable th) {
            try {
                extendedDataOutputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Override // herddb.storage.DataStorageManager
    public void writePage(String str, String str2, long j, Collection<Record> collection) throws DataStorageManagerException {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            VisibleByteArrayOutputStream visibleByteArrayOutputStream = new VisibleByteArrayOutputStream();
            try {
                long writePage = writePage(collection, visibleByteArrayOutputStream);
                HashMap hashMap = new HashMap();
                hashMap.put("tablespaceuuid", str.getBytes(StandardCharsets.UTF_8));
                hashMap.put("node", this.nodeId.getBytes(StandardCharsets.UTF_8));
                hashMap.put("application", "herddb".getBytes(StandardCharsets.UTF_8));
                hashMap.put("component", "datastore".getBytes(StandardCharsets.UTF_8));
                hashMap.put(BackupFileConstants.ENTRY_TYPE_TABLE, str2.getBytes(StandardCharsets.UTF_8));
                hashMap.put("type", "datapage".getBytes(StandardCharsets.UTF_8));
                WriteHandle writeHandle = (WriteHandle) FutureUtils.result(this.bk.getBookKeeper().newCreateLedgerOp().withEnsembleSize(this.bk.getEnsemble()).withWriteQuorumSize(this.bk.getWriteQuorumSize()).withAckQuorumSize(this.bk.getAckQuorumSize()).withPassword(new byte[0]).withDigestType(DigestType.CRC32C).withCustomMetadata(hashMap).execute(), BKException.HANDLER);
                try {
                    writeHandle.append(visibleByteArrayOutputStream.getBuffer(), 0, visibleByteArrayOutputStream.size());
                    long id = writeHandle.getId();
                    if (writeHandle != null) {
                        writeHandle.close();
                    }
                    visibleByteArrayOutputStream.close();
                    getTableSpacePagesMapping(str).getTablePagesMapping(str2).writePageId(j, id);
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    if (LOGGER.isLoggable(Level.FINER)) {
                        LOGGER.log(Level.FINER, "writePage {0} KBytes,{1} records, time {2} ms", new Object[]{(writePage / 1024) + "", Integer.valueOf(collection.size()), currentTimeMillis2 + ""});
                    }
                    this.dataPageWrites.registerSuccessfulEvent(currentTimeMillis2, TimeUnit.MILLISECONDS);
                } catch (Throwable th) {
                    if (writeHandle != null) {
                        try {
                            writeHandle.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (IOException | org.apache.bookkeeper.client.api.BKException e) {
            throw new DataStorageManagerException(e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new DataStorageManagerException(e2);
        }
    }

    private static long writeIndexPage(DataStorageManager.DataWriter dataWriter, VisibleByteArrayOutputStream visibleByteArrayOutputStream) throws IOException {
        ExtendedDataOutputStream extendedDataOutputStream = new ExtendedDataOutputStream(visibleByteArrayOutputStream);
        try {
            extendedDataOutputStream.writeVLong(1L);
            extendedDataOutputStream.writeVLong(0L);
            dataWriter.write(extendedDataOutputStream);
            extendedDataOutputStream.flush();
            extendedDataOutputStream.writeLong(XXHash64Utils.hash(visibleByteArrayOutputStream.getBuffer(), 0, visibleByteArrayOutputStream.size()));
            extendedDataOutputStream.flush();
            long size = visibleByteArrayOutputStream.size();
            extendedDataOutputStream.close();
            return size;
        } catch (Throwable th) {
            try {
                extendedDataOutputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Override // herddb.storage.DataStorageManager
    public void writeIndexPage(String str, String str2, long j, DataStorageManager.DataWriter dataWriter) throws DataStorageManagerException {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            VisibleByteArrayOutputStream visibleByteArrayOutputStream = new VisibleByteArrayOutputStream();
            try {
                long writeIndexPage = writeIndexPage(dataWriter, visibleByteArrayOutputStream);
                HashMap hashMap = new HashMap();
                hashMap.put("tablespaceuuid", str.getBytes(StandardCharsets.UTF_8));
                hashMap.put("node", this.nodeId.getBytes(StandardCharsets.UTF_8));
                hashMap.put("application", "herddb".getBytes(StandardCharsets.UTF_8));
                hashMap.put("component", "datastore".getBytes(StandardCharsets.UTF_8));
                hashMap.put(BookKeeperServerStats.LD_INDEX_SCOPE, str2.getBytes(StandardCharsets.UTF_8));
                hashMap.put("type", "indexpage".getBytes(StandardCharsets.UTF_8));
                WriteHandle writeHandle = (WriteHandle) FutureUtils.result(this.bk.getBookKeeper().newCreateLedgerOp().withEnsembleSize(this.bk.getEnsemble()).withWriteQuorumSize(this.bk.getWriteQuorumSize()).withAckQuorumSize(this.bk.getAckQuorumSize()).withPassword(new byte[0]).withDigestType(DigestType.CRC32C).withCustomMetadata(hashMap).execute(), BKException.HANDLER);
                try {
                    writeHandle.append(visibleByteArrayOutputStream.getBuffer(), 0, visibleByteArrayOutputStream.size());
                    long id = writeHandle.getId();
                    if (writeHandle != null) {
                        writeHandle.close();
                    }
                    visibleByteArrayOutputStream.close();
                    LOGGER.log(Level.INFO, "writeIndexPage {0} pageId {1}, ledgerId {2}", new Object[]{str2, Long.valueOf(j), Long.valueOf(id)});
                    getTableSpacePagesMapping(str).getIndexPagesMapping(str2).writePageId(j, id);
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    if (LOGGER.isLoggable(Level.FINER)) {
                        LOGGER.log(Level.FINER, "writePage {0} KBytes, time {2} ms", new Object[]{(writeIndexPage / 1024) + "", currentTimeMillis2 + ""});
                    }
                    this.indexPageWrites.registerSuccessfulEvent(currentTimeMillis2, TimeUnit.MILLISECONDS);
                } catch (Throwable th) {
                    if (writeHandle != null) {
                        try {
                            writeHandle.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (IOException | org.apache.bookkeeper.client.api.BKException e) {
            throw new DataStorageManagerException(e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new DataStorageManagerException(e2);
        }
    }

    private static LogSequenceNumber readLogSequenceNumberFromTablesMetadataFile(String str, byte[] bArr, String str2) throws DataStorageManagerException {
        try {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
            try {
                ExtendedDataInputStream extendedDataInputStream = new ExtendedDataInputStream(byteArrayInputStream);
                try {
                    long readVLong = extendedDataInputStream.readVLong();
                    long readVLong2 = extendedDataInputStream.readVLong();
                    if (readVLong != 1 || readVLong2 != 0) {
                        throw new DataStorageManagerException("corrupted table list znode" + str2);
                    }
                    if (!extendedDataInputStream.readUTF().equals(str)) {
                        throw new DataStorageManagerException("znode " + str2 + " is not for spablespace " + str);
                    }
                    LogSequenceNumber logSequenceNumber = new LogSequenceNumber(extendedDataInputStream.readZLong(), extendedDataInputStream.readZLong());
                    extendedDataInputStream.close();
                    byteArrayInputStream.close();
                    return logSequenceNumber;
                } catch (Throwable th) {
                    try {
                        extendedDataInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (IOException e) {
            throw new DataStorageManagerException(e);
        }
    }

    private static LogSequenceNumber readLogSequenceNumberFromIndexMetadataFile(String str, byte[] bArr, String str2) throws DataStorageManagerException {
        try {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
            try {
                ExtendedDataInputStream extendedDataInputStream = new ExtendedDataInputStream(byteArrayInputStream);
                try {
                    long readVLong = extendedDataInputStream.readVLong();
                    long readVLong2 = extendedDataInputStream.readVLong();
                    if (readVLong != 1 || readVLong2 != 0) {
                        throw new DataStorageManagerException("corrupted index list znode " + str2);
                    }
                    if (!extendedDataInputStream.readUTF().equals(str)) {
                        throw new DataStorageManagerException("znode " + str2 + " is not for spablespace " + str);
                    }
                    LogSequenceNumber logSequenceNumber = new LogSequenceNumber(extendedDataInputStream.readZLong(), extendedDataInputStream.readZLong());
                    extendedDataInputStream.close();
                    byteArrayInputStream.close();
                    return logSequenceNumber;
                } catch (Throwable th) {
                    try {
                        extendedDataInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (IOException e) {
            throw new DataStorageManagerException(e);
        }
    }

    @Override // herddb.storage.DataStorageManager
    public List<Table> loadTables(LogSequenceNumber logSequenceNumber, String str) throws DataStorageManagerException {
        try {
            ensureZNodeDirectory(getTableSpaceZNode(str));
            String tablespaceTablesMetadataFile = getTablespaceTablesMetadataFile(str, logSequenceNumber);
            LOGGER.log(Level.INFO, "loadTables for tableSpace " + str + " from " + tablespaceTablesMetadataFile + ", sequenceNumber:" + logSequenceNumber);
            byte[] readZNode = readZNode(tablespaceTablesMetadataFile, new Stat());
            if (readZNode != null) {
                return readTablespaceStructure(readZNode, str, logSequenceNumber);
            }
            if (!logSequenceNumber.isStartOfTime()) {
                throw new DataStorageManagerException("local table data not available for tableSpace " + str + ", recovering from sequenceNumber " + logSequenceNumber);
            }
            LOGGER.log(Level.INFO, "zode " + tablespaceTablesMetadataFile + " not found");
            return Collections.emptyList();
        } catch (IOException e) {
            throw new DataStorageManagerException(e);
        }
    }

    public static List<Table> readTablespaceStructure(byte[] bArr, String str, LogSequenceNumber logSequenceNumber) throws IOException, DataStorageManagerException {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
        try {
            ExtendedDataInputStream extendedDataInputStream = new ExtendedDataInputStream(byteArrayInputStream);
            try {
                long readVLong = extendedDataInputStream.readVLong();
                long readVLong2 = extendedDataInputStream.readVLong();
                if (readVLong != 1 || readVLong2 != 0) {
                    throw new DataStorageManagerException("corrupted table list file");
                }
                String readUTF = extendedDataInputStream.readUTF();
                if (!readUTF.equals(str)) {
                    throw new DataStorageManagerException("file is not for spablespace " + str + " but for " + readUTF);
                }
                long readZLong = extendedDataInputStream.readZLong();
                long readZLong2 = extendedDataInputStream.readZLong();
                if (logSequenceNumber != null && (readZLong != logSequenceNumber.ledgerId || readZLong2 != logSequenceNumber.offset)) {
                    throw new DataStorageManagerException("file is not for sequence number " + logSequenceNumber);
                }
                int readInt = extendedDataInputStream.readInt();
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < readInt; i++) {
                    arrayList.add(Table.deserialize(extendedDataInputStream.readArray()));
                }
                List<Table> unmodifiableList = Collections.unmodifiableList(arrayList);
                extendedDataInputStream.close();
                byteArrayInputStream.close();
                return unmodifiableList;
            } finally {
            }
        } catch (Throwable th) {
            try {
                byteArrayInputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Override // herddb.storage.DataStorageManager
    public List<Index> loadIndexes(LogSequenceNumber logSequenceNumber, String str) throws DataStorageManagerException {
        try {
            ensureZNodeDirectory(getTableSpaceZNode(str));
            String tablespaceIndexesMetadataFile = getTablespaceIndexesMetadataFile(str, logSequenceNumber);
            LOGGER.log(Level.INFO, "loadIndexes for tableSpace " + str + " from " + tablespaceIndexesMetadataFile + ", sequenceNumber:" + logSequenceNumber);
            byte[] readZNode = readZNode(tablespaceIndexesMetadataFile, new Stat());
            if (readZNode == null) {
                if (!logSequenceNumber.isStartOfTime()) {
                    throw new DataStorageManagerException("local index data not available for tableSpace " + str + ", recovering from sequenceNumber " + logSequenceNumber);
                }
                LOGGER.log(Level.INFO, "file " + tablespaceIndexesMetadataFile + " not found");
                return Collections.emptyList();
            }
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(readZNode);
            try {
                ExtendedDataInputStream extendedDataInputStream = new ExtendedDataInputStream(byteArrayInputStream);
                try {
                    long readVLong = extendedDataInputStream.readVLong();
                    long readVLong2 = extendedDataInputStream.readVLong();
                    if (readVLong != 1 || readVLong2 != 0) {
                        throw new DataStorageManagerException("corrupted index list file " + tablespaceIndexesMetadataFile);
                    }
                    if (!extendedDataInputStream.readUTF().equals(str)) {
                        throw new DataStorageManagerException("file " + tablespaceIndexesMetadataFile + " is not for tablespace " + str);
                    }
                    long readZLong = extendedDataInputStream.readZLong();
                    long readZLong2 = extendedDataInputStream.readZLong();
                    if (readZLong != logSequenceNumber.ledgerId || readZLong2 != logSequenceNumber.offset) {
                        throw new DataStorageManagerException("file " + tablespaceIndexesMetadataFile + " is not for sequence number " + logSequenceNumber);
                    }
                    int readInt = extendedDataInputStream.readInt();
                    ArrayList arrayList = new ArrayList();
                    for (int i = 0; i < readInt; i++) {
                        arrayList.add(Index.deserialize(extendedDataInputStream.readArray()));
                    }
                    List<Index> unmodifiableList = Collections.unmodifiableList(arrayList);
                    extendedDataInputStream.close();
                    byteArrayInputStream.close();
                    return unmodifiableList;
                } catch (Throwable th) {
                    try {
                        extendedDataInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (IOException e) {
            throw new DataStorageManagerException(e);
        }
    }

    @Override // herddb.storage.DataStorageManager
    public Collection<PostCheckpointAction> writeTables(String str, LogSequenceNumber logSequenceNumber, List<Table> list, List<Index> list2, boolean z) throws DataStorageManagerException {
        if (logSequenceNumber.isStartOfTime() && !list.isEmpty()) {
            throw new DataStorageManagerException("impossible to write a non empty table list at start-of-time");
        }
        persistTableSpaceMapping(str);
        String tableSpaceZNode = getTableSpaceZNode(str);
        String tablespaceTablesMetadataFile = getTablespaceTablesMetadataFile(str, logSequenceNumber);
        String tablespaceIndexesMetadataFile = getTablespaceIndexesMetadataFile(str, logSequenceNumber);
        LOGGER.log(Level.FINE, "writeTables for tableSpace " + str + " sequenceNumber " + logSequenceNumber + " to " + tablespaceTablesMetadataFile);
        try {
            VisibleByteArrayOutputStream visibleByteArrayOutputStream = new VisibleByteArrayOutputStream();
            try {
                ExtendedDataOutputStream extendedDataOutputStream = new ExtendedDataOutputStream(visibleByteArrayOutputStream);
                try {
                    extendedDataOutputStream.writeVLong(1L);
                    extendedDataOutputStream.writeVLong(0L);
                    extendedDataOutputStream.writeUTF(str);
                    extendedDataOutputStream.writeZLong(logSequenceNumber.ledgerId);
                    extendedDataOutputStream.writeZLong(logSequenceNumber.offset);
                    extendedDataOutputStream.writeInt(list.size());
                    Iterator<Table> it = list.iterator();
                    while (it.hasNext()) {
                        extendedDataOutputStream.writeArray(it.next().serialize());
                    }
                    extendedDataOutputStream.flush();
                    writeZNode(tablespaceTablesMetadataFile, visibleByteArrayOutputStream.toByteArray(), null);
                    extendedDataOutputStream.close();
                    visibleByteArrayOutputStream.close();
                    try {
                        visibleByteArrayOutputStream = new VisibleByteArrayOutputStream();
                        try {
                            extendedDataOutputStream = new ExtendedDataOutputStream(visibleByteArrayOutputStream);
                            try {
                                extendedDataOutputStream.writeVLong(1L);
                                extendedDataOutputStream.writeVLong(0L);
                                extendedDataOutputStream.writeUTF(str);
                                extendedDataOutputStream.writeZLong(logSequenceNumber.ledgerId);
                                extendedDataOutputStream.writeZLong(logSequenceNumber.offset);
                                if (list2 != null) {
                                    extendedDataOutputStream.writeInt(list2.size());
                                    Iterator<Index> it2 = list2.iterator();
                                    while (it2.hasNext()) {
                                        extendedDataOutputStream.writeArray(it2.next().serialize());
                                    }
                                } else {
                                    extendedDataOutputStream.writeInt(0);
                                }
                                extendedDataOutputStream.flush();
                                writeZNode(tablespaceIndexesMetadataFile, visibleByteArrayOutputStream.toByteArray(), null);
                                extendedDataOutputStream.close();
                                visibleByteArrayOutputStream.close();
                                ArrayList arrayList = new ArrayList();
                                if (z) {
                                    for (String str2 : zkGetChildren(tableSpaceZNode)) {
                                        if (isTablespaceIndexesMetadataFile(str2)) {
                                            try {
                                                byte[] readZNode = readZNode(str2, new Stat());
                                                if (readZNode != null && logSequenceNumber.after(readLogSequenceNumberFromIndexMetadataFile(str, readZNode, str2))) {
                                                    LOGGER.log(Level.FINEST, "indexes metadata file " + str2 + ". will be deleted after checkpoint end");
                                                    arrayList.add(new DeleteZNodeAction("indexes", "delete indexesmetadata file " + str2, str2));
                                                }
                                            } catch (DataStorageManagerException e) {
                                                LOGGER.log(Level.SEVERE, "Unparsable indexesmetadata file " + str2, (Throwable) e);
                                                arrayList.add(new DeleteZNodeAction("indexes", "delete unparsable indexesmetadata file " + str2, str2));
                                            }
                                        } else if (isTablespaceTablesMetadataFile(str2)) {
                                            try {
                                                byte[] readZNode2 = readZNode(str2, new Stat());
                                                if (readZNode2 != null && logSequenceNumber.after(readLogSequenceNumberFromTablesMetadataFile(str, readZNode2, str2))) {
                                                    LOGGER.log(Level.FINEST, "tables metadata file " + str2 + ". will be deleted after checkpoint end");
                                                    arrayList.add(new DeleteZNodeAction("tables", "delete tablesmetadata file " + str2, str2));
                                                }
                                            } catch (DataStorageManagerException e2) {
                                                LOGGER.log(Level.SEVERE, "Unparsable tablesmetadata file " + str2, (Throwable) e2);
                                                arrayList.add(new DeleteZNodeAction(BackupFileConstants.ENTRY_TYPE_TRANSACTIONS, "delete unparsable tablesmetadata file " + str2, str2));
                                            }
                                        }
                                    }
                                }
                                return arrayList;
                            } finally {
                            }
                        } finally {
                            try {
                                visibleByteArrayOutputStream.close();
                            } catch (Throwable th) {
                                th.addSuppressed(th);
                            }
                        }
                    } catch (IOException e3) {
                        throw new DataStorageManagerException(e3);
                    }
                } finally {
                }
            } finally {
            }
        } catch (IOException e4) {
            throw new DataStorageManagerException(e4);
        }
    }

    @Override // herddb.storage.DataStorageManager
    public Collection<PostCheckpointAction> writeCheckpointSequenceNumber(String str, LogSequenceNumber logSequenceNumber) throws DataStorageManagerException {
        persistTableSpaceMapping(str);
        String tablespaceCheckPointInfoFile = getTablespaceCheckPointInfoFile(str, logSequenceNumber);
        LOGGER.log(Level.INFO, "checkpoint for " + str + " at " + logSequenceNumber + " to " + tablespaceCheckPointInfoFile);
        try {
            VisibleByteArrayOutputStream visibleByteArrayOutputStream = new VisibleByteArrayOutputStream();
            try {
                ExtendedDataOutputStream extendedDataOutputStream = new ExtendedDataOutputStream(visibleByteArrayOutputStream);
                try {
                    extendedDataOutputStream.writeVLong(1L);
                    extendedDataOutputStream.writeVLong(0L);
                    extendedDataOutputStream.writeUTF(str);
                    extendedDataOutputStream.writeZLong(logSequenceNumber.ledgerId);
                    extendedDataOutputStream.writeZLong(logSequenceNumber.offset);
                    extendedDataOutputStream.flush();
                    writeZNode(tablespaceCheckPointInfoFile, visibleByteArrayOutputStream.toByteArray(), null);
                    extendedDataOutputStream.close();
                    visibleByteArrayOutputStream.close();
                    List<String> zkGetChildren = zkGetChildren(getTableSpaceZNode(str));
                    ArrayList arrayList = new ArrayList();
                    for (String str2 : zkGetChildren) {
                        if (isTablespaceCheckPointInfoFile(str2)) {
                            try {
                                byte[] readZNode = readZNode(str2, new Stat());
                                if (readZNode != null && logSequenceNumber.after(readLogSequenceNumberFromCheckpointInfoFile(str, readZNode, str2))) {
                                    LOGGER.log(Level.FINEST, "checkpoint info file " + str2 + ". will be deleted after checkpoint end");
                                    arrayList.add(new DeleteZNodeAction("checkpoint", "delete checkpoint info file " + str2, str2));
                                }
                            } catch (DataStorageManagerException | IOException e) {
                                LOGGER.log(Level.SEVERE, "unparsable checkpoint info file " + str2, e);
                            }
                        }
                    }
                    return arrayList;
                } catch (Throwable th) {
                    try {
                        extendedDataOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (IOException e2) {
            throw new DataStorageManagerException(e2);
        }
    }

    @Override // herddb.storage.DataStorageManager
    public void dropTable(String str, String str2) throws DataStorageManagerException {
        persistTableSpaceMapping(str);
        String tableDirectory = getTableDirectory(str, str2);
        LOGGER.log(Level.INFO, "dropTable {0}.{1} in {2}", new Object[]{str, str2, tableDirectory});
        try {
            ZKUtil.deleteRecursive(this.zk.ensureZooKeeper(), tableDirectory);
        } catch (IOException | InterruptedException | KeeperException e) {
            throw new DataStorageManagerException(e);
        }
    }

    @Override // herddb.storage.DataStorageManager
    public void truncateIndex(String str, String str2) throws DataStorageManagerException {
        persistTableSpaceMapping(str);
        String indexDirectory = getIndexDirectory(str, str2);
        LOGGER.log(Level.INFO, "truncateIndex {0}.{1} in {2}", new Object[]{str, str2, indexDirectory});
        try {
            ZKUtil.deleteRecursive(this.zk.ensureZooKeeper(), indexDirectory);
        } catch (IOException | InterruptedException | KeeperException e) {
            throw new DataStorageManagerException(e);
        }
    }

    @Override // herddb.storage.DataStorageManager
    public void dropIndex(String str, String str2) throws DataStorageManagerException {
        persistTableSpaceMapping(str);
        String indexDirectory = getIndexDirectory(str, str2);
        LOGGER.log(Level.INFO, "dropIndex {0}.{1} in {2}", new Object[]{str, str2, indexDirectory});
        try {
            ZKUtil.deleteRecursive(this.zk.ensureZooKeeper(), indexDirectory);
        } catch (IOException | InterruptedException | KeeperException e) {
            throw new DataStorageManagerException(e);
        }
    }

    private static LogSequenceNumber readLogSequenceNumberFromCheckpointInfoFile(String str, byte[] bArr, String str2) throws DataStorageManagerException, IOException {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
        try {
            ExtendedDataInputStream extendedDataInputStream = new ExtendedDataInputStream(byteArrayInputStream);
            try {
                long readVLong = extendedDataInputStream.readVLong();
                long readVLong2 = extendedDataInputStream.readVLong();
                if (readVLong != 1 || readVLong2 != 0) {
                    throw new IOException("corrupted checkpoint file");
                }
                String readUTF = extendedDataInputStream.readUTF();
                if (!readUTF.equals(str)) {
                    throw new DataStorageManagerException("zonde " + str2 + " is not for spablespace " + str + " but for " + readUTF);
                }
                LogSequenceNumber logSequenceNumber = new LogSequenceNumber(extendedDataInputStream.readZLong(), extendedDataInputStream.readZLong());
                extendedDataInputStream.close();
                byteArrayInputStream.close();
                return logSequenceNumber;
            } finally {
            }
        } catch (Throwable th) {
            try {
                byteArrayInputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Override // herddb.storage.DataStorageManager
    public LogSequenceNumber getLastcheckpointSequenceNumber(String str) throws DataStorageManagerException {
        try {
            String tableSpaceZNode = getTableSpaceZNode(str);
            LogSequenceNumber logSequenceNumber = LogSequenceNumber.START_OF_TIME;
            for (String str2 : zkGetChildren(tableSpaceZNode)) {
                if (isTablespaceCheckPointInfoFile(str2)) {
                    try {
                        byte[] readZNode = readZNode(str2, new Stat());
                        if (readZNode != null) {
                            LogSequenceNumber readLogSequenceNumberFromCheckpointInfoFile = readLogSequenceNumberFromCheckpointInfoFile(str, readZNode, str2);
                            if (readLogSequenceNumberFromCheckpointInfoFile.after(logSequenceNumber)) {
                                logSequenceNumber = readLogSequenceNumberFromCheckpointInfoFile;
                            }
                        }
                    } catch (DataStorageManagerException e) {
                        LOGGER.log(Level.SEVERE, "unparsable checkpoint info file " + str2, (Throwable) e);
                    }
                }
            }
            return logSequenceNumber;
        } catch (IOException e2) {
            throw new DataStorageManagerException(e2);
        }
    }

    @Override // herddb.storage.DataStorageManager
    public KeyToPageIndex createKeyToPageMap(String str, String str2, MemoryManager memoryManager) throws DataStorageManagerException {
        return new BLinkKeyToPageIndex(str, str2, memoryManager, this);
    }

    @Override // herddb.storage.DataStorageManager
    public void releaseKeyToPageMap(String str, String str2, KeyToPageIndex keyToPageIndex) {
        if (keyToPageIndex != null) {
            keyToPageIndex.close();
        }
    }

    @Override // herddb.storage.DataStorageManager
    public RecordSetFactory createRecordSetFactory() {
        return new FileRecordSetFactory(this.tmpDirectory, this.swapThreshold);
    }

    private static LogSequenceNumber readLogSequenceNumberFromTransactionsFile(String str, byte[] bArr, String str2) throws DataStorageManagerException {
        try {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
            try {
                ExtendedDataInputStream extendedDataInputStream = new ExtendedDataInputStream(byteArrayInputStream);
                try {
                    long readVLong = extendedDataInputStream.readVLong();
                    long readVLong2 = extendedDataInputStream.readVLong();
                    if (readVLong != 1 || readVLong2 != 0) {
                        throw new DataStorageManagerException("corrupted transaction list znode " + str2);
                    }
                    if (!extendedDataInputStream.readUTF().equals(str)) {
                        throw new DataStorageManagerException("znode " + str2 + " is not for spablespace " + str);
                    }
                    LogSequenceNumber logSequenceNumber = new LogSequenceNumber(extendedDataInputStream.readZLong(), extendedDataInputStream.readZLong());
                    extendedDataInputStream.close();
                    byteArrayInputStream.close();
                    return logSequenceNumber;
                } catch (Throwable th) {
                    try {
                        extendedDataInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (IOException e) {
            throw new DataStorageManagerException(e);
        }
    }

    @Override // herddb.storage.DataStorageManager
    public void loadTransactions(LogSequenceNumber logSequenceNumber, String str, Consumer<Transaction> consumer) throws DataStorageManagerException {
        try {
            String tablespaceTransactionsFile = getTablespaceTransactionsFile(str, logSequenceNumber);
            byte[] readZNode = readZNode(tablespaceTransactionsFile, new Stat());
            boolean z = readZNode != null;
            LOGGER.log(Level.INFO, "loadTransactions " + logSequenceNumber + " for tableSpace " + str + " from file " + tablespaceTransactionsFile + " (exists: " + z + DefaultExpressionEngine.DEFAULT_INDEX_END);
            if (z) {
                ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(readZNode);
                try {
                    ExtendedDataInputStream extendedDataInputStream = new ExtendedDataInputStream(byteArrayInputStream);
                    try {
                        long readVLong = extendedDataInputStream.readVLong();
                        long readVLong2 = extendedDataInputStream.readVLong();
                        if (readVLong != 1 || readVLong2 != 0) {
                            throw new DataStorageManagerException("corrupted transaction list file " + tablespaceTransactionsFile);
                        }
                        if (!extendedDataInputStream.readUTF().equals(str)) {
                            throw new DataStorageManagerException("file " + tablespaceTransactionsFile + " is not for spablespace " + str);
                        }
                        long readZLong = extendedDataInputStream.readZLong();
                        long readZLong2 = extendedDataInputStream.readZLong();
                        if (readZLong != logSequenceNumber.ledgerId || readZLong2 != logSequenceNumber.offset) {
                            throw new DataStorageManagerException("file " + tablespaceTransactionsFile + " is not for sequence number " + logSequenceNumber);
                        }
                        int readInt = extendedDataInputStream.readInt();
                        for (int i = 0; i < readInt; i++) {
                            consumer.accept(Transaction.deserialize(str, extendedDataInputStream));
                        }
                        extendedDataInputStream.close();
                        byteArrayInputStream.close();
                    } catch (Throwable th) {
                        try {
                            extendedDataInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } finally {
                }
            }
        } catch (IOException e) {
            throw new DataStorageManagerException(e);
        }
    }

    @Override // herddb.storage.DataStorageManager
    public Collection<PostCheckpointAction> writeTransactionsAtCheckpoint(String str, LogSequenceNumber logSequenceNumber, Collection<Transaction> collection) throws DataStorageManagerException {
        if (logSequenceNumber.isStartOfTime() && !collection.isEmpty()) {
            throw new DataStorageManagerException("impossible to write a non empty transactions list at start-of-time");
        }
        String tablespaceTransactionsFile = getTablespaceTransactionsFile(str, logSequenceNumber);
        LOGGER.log(Level.FINE, "writeTransactionsAtCheckpoint for tableSpace {0} sequenceNumber {1} to {2}, active transactions {3}", new Object[]{str, logSequenceNumber, tablespaceTransactionsFile, Integer.valueOf(collection.size())});
        try {
            VisibleByteArrayOutputStream visibleByteArrayOutputStream = new VisibleByteArrayOutputStream();
            try {
                ExtendedDataOutputStream extendedDataOutputStream = new ExtendedDataOutputStream(visibleByteArrayOutputStream);
                try {
                    extendedDataOutputStream.writeVLong(1L);
                    extendedDataOutputStream.writeVLong(0L);
                    extendedDataOutputStream.writeUTF(str);
                    extendedDataOutputStream.writeZLong(logSequenceNumber.ledgerId);
                    extendedDataOutputStream.writeZLong(logSequenceNumber.offset);
                    extendedDataOutputStream.writeInt(collection.size());
                    Iterator<Transaction> it = collection.iterator();
                    while (it.hasNext()) {
                        it.next().serialize(extendedDataOutputStream);
                    }
                    extendedDataOutputStream.flush();
                    writeZNode(tablespaceTransactionsFile, visibleByteArrayOutputStream.toByteArray(), null);
                    extendedDataOutputStream.close();
                    visibleByteArrayOutputStream.close();
                    ArrayList arrayList = new ArrayList();
                    for (String str2 : zkGetChildren(getTableSpaceZNode(str))) {
                        if (isTransactionsFile(str2)) {
                            try {
                                byte[] readZNode = readZNode(tablespaceTransactionsFile, new Stat());
                                if (readZNode != null && logSequenceNumber.after(readLogSequenceNumberFromTransactionsFile(str, readZNode, str2))) {
                                    LOGGER.log(Level.FINEST, "transactions metadata file " + str2 + ". will be deleted after checkpoint end");
                                    arrayList.add(new DeleteZNodeAction(BackupFileConstants.ENTRY_TYPE_TRANSACTIONS, "delete transactions file " + str2, str2));
                                }
                            } catch (DataStorageManagerException e) {
                                LOGGER.log(Level.SEVERE, "Unparsable transactions file " + str2, (Throwable) e);
                                arrayList.add(new DeleteZNodeAction(BackupFileConstants.ENTRY_TYPE_TRANSACTIONS, "delete unparsable transactions file " + str2, str2));
                            }
                        }
                    }
                    return arrayList;
                } catch (Throwable th) {
                    try {
                        extendedDataOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (IOException e2) {
            throw new DataStorageManagerException(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void dropLedgerForTable(String str, String str2, long j, long j2, String str3) {
        try {
            LOGGER.log(Level.FINE, str3);
            try {
                FutureUtils.result(this.bk.getBookKeeper().newDeleteLedgerOp().withLedgerId(j2).execute(), BKException.HANDLER);
            } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException e) {
                LOGGER.log(Level.SEVERE, "ledger " + j2 + " already dropped:" + e, (Throwable) e);
            }
            getTableSpacePagesMapping(str).getTablePagesMapping(str2).removePageId(j);
        } catch (BKException e2) {
            LOGGER.log(Level.SEVERE, "Could not delete ledger " + j2 + BookKeeperConstants.COLON + e2, (Throwable) e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void dropLedgerForIndex(String str, String str2, long j, long j2, String str3) {
        try {
            LOGGER.log(Level.FINE, str3);
            try {
                FutureUtils.result(this.bk.getBookKeeper().newDeleteLedgerOp().withLedgerId(j2).execute(), BKException.HANDLER);
            } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException e) {
                LOGGER.log(Level.SEVERE, "ledger " + j2 + " already dropped:" + e, (Throwable) e);
            }
            getTableSpacePagesMapping(str).getIndexPagesMapping(str2).removePageId(j);
        } catch (BKException e2) {
            LOGGER.log(Level.SEVERE, "Could not delete ledger " + j2 + BookKeeperConstants.COLON + e2, (Throwable) e2);
        }
    }

    private static RecyclableByteArrayOutputStream getWriteBuffer() {
        RecyclableByteArrayOutputStream recyclableByteArrayOutputStream = WRITE_BUFFERS_RECYCLER.get();
        recyclableByteArrayOutputStream.closed = false;
        recyclableByteArrayOutputStream.reset();
        return recyclableByteArrayOutputStream;
    }
}
