/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter;
import org.apache.kafka.streams.state.internals.RocksDBRangeIterator;
import org.apache.kafka.streams.state.internals.RocksDbIterator;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.FlushOptions;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBStore
implements KeyValueStore<Bytes, byte[]> {
    private static final Logger log = LoggerFactory.getLogger(RocksDBStore.class);
    private static final Pattern SST_FILE_EXTENSION = Pattern.compile(".*\\.sst");
    private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
    private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
    private static final long WRITE_BUFFER_SIZE = 0x1000000L;
    private static final long BLOCK_CACHE_SIZE = 0x3200000L;
    private static final long BLOCK_SIZE = 4096L;
    private static final int MAX_WRITE_BUFFERS = 3;
    private static final String DB_FILE_DIR = "rocksdb";
    final String name;
    private final String parentDir;
    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet());
    File dbDir;
    RocksDB db;
    RocksDBAccessor dbAccessor;
    private RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter userSpecifiedOptions;
    WriteOptions wOptions;
    FlushOptions fOptions;
    private BloomFilter filter;
    private volatile boolean prepareForBulkload = false;
    ProcessorContext internalProcessorContext;
    volatile BatchingStateRestoreCallback batchingStateRestoreCallback = null;
    protected volatile boolean open = false;

    RocksDBStore(String name) {
        this(name, DB_FILE_DIR);
    }

    RocksDBStore(String name, String parentDir) {
        this.name = name;
        this.parentDir = parentDir;
    }

    void openDB(ProcessorContext context) {
        DBOptions dbOptions = new DBOptions();
        ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions();
        this.userSpecifiedOptions = new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(dbOptions, columnFamilyOptions);
        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
        tableConfig.setBlockCacheSize(0x3200000L);
        tableConfig.setBlockSize(4096L);
        this.filter = new BloomFilter();
        tableConfig.setFilter(this.filter);
        this.userSpecifiedOptions.optimizeFiltersForHits();
        this.userSpecifiedOptions.setTableFormatConfig(tableConfig);
        this.userSpecifiedOptions.setWriteBufferSize(0x1000000L);
        this.userSpecifiedOptions.setCompressionType(COMPRESSION_TYPE);
        this.userSpecifiedOptions.setCompactionStyle(COMPACTION_STYLE);
        this.userSpecifiedOptions.setMaxWriteBufferNumber(3);
        this.userSpecifiedOptions.setCreateIfMissing(true);
        this.userSpecifiedOptions.setErrorIfExists(false);
        this.userSpecifiedOptions.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
        this.userSpecifiedOptions.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2));
        this.wOptions = new WriteOptions();
        this.wOptions.setDisableWAL(true);
        this.fOptions = new FlushOptions();
        this.fOptions.setWaitForFlush(true);
        Map<String, Object> configs = context.appConfigs();
        Class configSetterClass = (Class)configs.get("rocksdb.config.setter");
        if (configSetterClass != null) {
            RocksDBConfigSetter configSetter = (RocksDBConfigSetter)Utils.newInstance(configSetterClass);
            configSetter.setConfig(this.name, this.userSpecifiedOptions, configs);
        }
        if (this.prepareForBulkload) {
            this.userSpecifiedOptions.prepareForBulkLoad();
        }
        this.dbDir = new File(new File(context.stateDir(), this.parentDir), this.name);
        try {
            Files.createDirectories(this.dbDir.getParentFile().toPath(), new FileAttribute[0]);
            Files.createDirectories(this.dbDir.getAbsoluteFile().toPath(), new FileAttribute[0]);
        }
        catch (IOException fatal) {
            throw new ProcessorStateException(fatal);
        }
        this.openRocksDB(dbOptions, columnFamilyOptions);
        this.open = true;
    }

    void openRocksDB(DBOptions dbOptions, ColumnFamilyOptions columnFamilyOptions) {
        List<ColumnFamilyDescriptor> columnFamilyDescriptors = Collections.singletonList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions));
        ArrayList<ColumnFamilyHandle> columnFamilies = new ArrayList<ColumnFamilyHandle>(columnFamilyDescriptors.size());
        try {
            this.db = RocksDB.open(dbOptions, this.dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilies);
            this.dbAccessor = new SingleColumnFamilyAccessor((ColumnFamilyHandle)columnFamilies.get(0));
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error opening store " + this.name + " at location " + this.dbDir.toString(), e);
        }
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        this.internalProcessorContext = context;
        this.openDB(context);
        this.batchingStateRestoreCallback = new RocksDBBatchingRestoreCallback(this);
        context.register(root, this.batchingStateRestoreCallback);
    }

    boolean isPrepareForBulkload() {
        return this.prepareForBulkload;
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public boolean persistent() {
        return true;
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    private void validateStoreOpen() {
        if (!this.open) {
            throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
        }
    }

    @Override
    public synchronized void put(Bytes key, byte[] value) {
        Objects.requireNonNull(key, "key cannot be null");
        this.validateStoreOpen();
        this.dbAccessor.put(key.get(), value);
    }

    @Override
    public synchronized byte[] putIfAbsent(Bytes key, byte[] value) {
        Objects.requireNonNull(key, "key cannot be null");
        byte[] originalValue = this.get(key);
        if (originalValue == null) {
            this.put(key, value);
        }
        return originalValue;
    }

    @Override
    public void putAll(List<KeyValue<Bytes, byte[]>> entries) {
        try (WriteBatch batch = new WriteBatch();){
            this.dbAccessor.prepareBatch(entries, batch);
            this.write(batch);
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
        }
    }

    @Override
    public synchronized byte[] get(Bytes key) {
        this.validateStoreOpen();
        try {
            return this.dbAccessor.get(key.get());
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while getting value for key from store " + this.name, e);
        }
    }

    @Override
    public synchronized byte[] delete(Bytes key) {
        byte[] oldValue;
        Objects.requireNonNull(key, "key cannot be null");
        try {
            oldValue = this.dbAccessor.getOnly(key.get());
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while getting value for key from store " + this.name, e);
        }
        this.put(key, null);
        return oldValue;
    }

    @Override
    public synchronized KeyValueIterator<Bytes, byte[]> range(Bytes from, Bytes to) {
        Objects.requireNonNull(from, "from cannot be null");
        Objects.requireNonNull(to, "to cannot be null");
        this.validateStoreOpen();
        KeyValueIterator<Bytes, byte[]> rocksDBRangeIterator = this.dbAccessor.range(from, to);
        this.openIterators.add(rocksDBRangeIterator);
        return rocksDBRangeIterator;
    }

    @Override
    public synchronized KeyValueIterator<Bytes, byte[]> all() {
        this.validateStoreOpen();
        KeyValueIterator<Bytes, byte[]> rocksDbIterator = this.dbAccessor.all();
        this.openIterators.add(rocksDbIterator);
        return rocksDbIterator;
    }

    @Override
    public long approximateNumEntries() {
        long numEntries;
        this.validateStoreOpen();
        try {
            numEntries = this.dbAccessor.approximateNumEntries();
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error fetching property from store " + this.name, e);
        }
        if (this.isOverflowing(numEntries)) {
            return Long.MAX_VALUE;
        }
        return numEntries;
    }

    private boolean isOverflowing(long value) {
        return value < 0L;
    }

    @Override
    public synchronized void flush() {
        if (this.db == null) {
            return;
        }
        try {
            this.dbAccessor.flush();
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while executing flush from store " + this.name, e);
        }
    }

    void toggleDbForBulkLoading(boolean prepareForBulkload) {
        String[] sstFileNames;
        if (prepareForBulkload && (sstFileNames = this.dbDir.list((dir, name) -> SST_FILE_EXTENSION.matcher(name).matches())) != null && sstFileNames.length > 0) {
            this.dbAccessor.toggleDbForBulkLoading();
        }
        this.close();
        this.prepareForBulkload = prepareForBulkload;
        this.openDB(this.internalProcessorContext);
    }

    void write(WriteBatch batch) throws RocksDBException {
        this.db.write(this.wOptions, batch);
    }

    @Override
    public synchronized void close() {
        if (!this.open) {
            return;
        }
        this.open = false;
        this.closeOpenIterators();
        this.dbAccessor.close();
        this.userSpecifiedOptions.close();
        this.wOptions.close();
        this.fOptions.close();
        this.db.close();
        this.filter.close();
        this.dbAccessor = null;
        this.userSpecifiedOptions = null;
        this.wOptions = null;
        this.fOptions = null;
        this.db = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeOpenIterators() {
        HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
        Set<KeyValueIterator<Bytes, byte[]>> set = this.openIterators;
        synchronized (set) {
            iterators = new HashSet<KeyValueIterator<Bytes, byte[]>>(this.openIterators);
        }
        if (iterators.size() != 0) {
            log.warn("Closing {} open iterators for store {}", (Object)iterators.size(), (Object)this.name);
            for (KeyValueIterator keyValueIterator : iterators) {
                keyValueIterator.close();
            }
        }
    }

    public Options getOptions() {
        return this.userSpecifiedOptions;
    }

    static class RocksDBBatchingRestoreCallback
    extends AbstractNotifyingBatchingRestoreCallback {
        private final RocksDBStore rocksDBStore;

        RocksDBBatchingRestoreCallback(RocksDBStore rocksDBStore) {
            this.rocksDBStore = rocksDBStore;
        }

        @Override
        public void restoreAll(Collection<KeyValue<byte[], byte[]>> records) {
            try (WriteBatch batch = new WriteBatch();){
                this.rocksDBStore.dbAccessor.prepareBatchForRestore(records, batch);
                this.rocksDBStore.write(batch);
            }
            catch (RocksDBException e) {
                throw new ProcessorStateException("Error restoring batch to store " + this.rocksDBStore.name, e);
            }
        }

        @Override
        public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
            this.rocksDBStore.toggleDbForBulkLoading(true);
        }

        @Override
        public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
            this.rocksDBStore.toggleDbForBulkLoading(false);
        }
    }

    class SingleColumnFamilyAccessor
    implements RocksDBAccessor {
        private final ColumnFamilyHandle columnFamily;

        SingleColumnFamilyAccessor(ColumnFamilyHandle columnFamily) {
            this.columnFamily = columnFamily;
        }

        @Override
        public void put(byte[] key, byte[] value) {
            if (value == null) {
                try {
                    RocksDBStore.this.db.delete(this.columnFamily, RocksDBStore.this.wOptions, key);
                }
                catch (RocksDBException e) {
                    throw new ProcessorStateException("Error while removing key from store " + RocksDBStore.this.name, e);
                }
            }
            try {
                RocksDBStore.this.db.put(this.columnFamily, RocksDBStore.this.wOptions, key, value);
            }
            catch (RocksDBException e) {
                throw new ProcessorStateException("Error while putting key/value into store " + RocksDBStore.this.name, e);
            }
        }

        @Override
        public void prepareBatch(List<KeyValue<Bytes, byte[]>> entries, WriteBatch batch) throws RocksDBException {
            for (KeyValue<Bytes, byte[]> entry : entries) {
                Objects.requireNonNull(entry.key, "key cannot be null");
                if (entry.value == null) {
                    batch.delete(this.columnFamily, ((Bytes)entry.key).get());
                    continue;
                }
                batch.put(this.columnFamily, ((Bytes)entry.key).get(), (byte[])entry.value);
            }
        }

        @Override
        public byte[] get(byte[] key) throws RocksDBException {
            return RocksDBStore.this.db.get(this.columnFamily, key);
        }

        @Override
        public byte[] getOnly(byte[] key) throws RocksDBException {
            return RocksDBStore.this.db.get(this.columnFamily, key);
        }

        @Override
        public KeyValueIterator<Bytes, byte[]> range(Bytes from, Bytes to) {
            return new RocksDBRangeIterator(RocksDBStore.this.name, RocksDBStore.this.db.newIterator(this.columnFamily), RocksDBStore.this.openIterators, from, to);
        }

        @Override
        public KeyValueIterator<Bytes, byte[]> all() {
            RocksIterator innerIterWithTimestamp = RocksDBStore.this.db.newIterator(this.columnFamily);
            innerIterWithTimestamp.seekToFirst();
            return new RocksDbIterator(RocksDBStore.this.name, innerIterWithTimestamp, RocksDBStore.this.openIterators);
        }

        @Override
        public long approximateNumEntries() throws RocksDBException {
            return RocksDBStore.this.db.getLongProperty(this.columnFamily, "rocksdb.estimate-num-keys");
        }

        @Override
        public void flush() throws RocksDBException {
            RocksDBStore.this.db.flush(RocksDBStore.this.fOptions, this.columnFamily);
        }

        @Override
        public void prepareBatchForRestore(Collection<KeyValue<byte[], byte[]>> records, WriteBatch batch) throws RocksDBException {
            for (KeyValue<byte[], byte[]> record : records) {
                if (record.value == null) {
                    batch.delete(this.columnFamily, (byte[])record.key);
                    continue;
                }
                batch.put(this.columnFamily, (byte[])record.key, (byte[])record.value);
            }
        }

        @Override
        public void close() {
            this.columnFamily.close();
        }

        @Override
        public void toggleDbForBulkLoading() {
            try {
                RocksDBStore.this.db.compactRange(this.columnFamily, true, 1, 0);
            }
            catch (RocksDBException e) {
                throw new ProcessorStateException("Error while range compacting during restoring  store " + RocksDBStore.this.name, e);
            }
        }
    }

    static interface RocksDBAccessor {
        public void put(byte[] var1, byte[] var2);

        public void prepareBatch(List<KeyValue<Bytes, byte[]>> var1, WriteBatch var2) throws RocksDBException;

        public byte[] get(byte[] var1) throws RocksDBException;

        public byte[] getOnly(byte[] var1) throws RocksDBException;

        public KeyValueIterator<Bytes, byte[]> range(Bytes var1, Bytes var2);

        public KeyValueIterator<Bytes, byte[]> all();

        public long approximateNumEntries() throws RocksDBException;

        public void flush() throws RocksDBException;

        public void prepareBatchForRestore(Collection<KeyValue<byte[], byte[]>> var1, WriteBatch var2) throws RocksDBException;

        public void close();

        public void toggleDbForBulkLoading();
    }
}

