/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.mvcc;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.pulsar.shade.com.google.common.collect.Iterators;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.PeekingIterator;
import org.apache.pulsar.shade.com.google.common.primitives.UnsignedBytes;
import org.apache.pulsar.shade.com.google.protobuf.TextFormat;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator;
import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
import org.apache.pulsar.shade.io.netty.util.ReferenceCountUtil;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.impl.op.OpFactoryImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.impl.result.DeleteResultImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.impl.result.IncrementResultImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.impl.result.KeyValueFactory;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.impl.result.PutResultImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.impl.result.RangeResultImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.impl.result.ResultFactory;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.impl.result.TxnResultImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.op.CompareOp;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.op.CompareResult;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.op.CompareTarget;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.op.DeleteOp;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.op.IncrementOp;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.op.Op;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.op.OpFactory;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.op.PutOp;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.op.RangeOp;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.op.TxnOp;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.options.DeleteOption;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.options.Options;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.options.RangeOption;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.result.Code;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.result.DeleteResult;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.result.IncrementResult;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.result.PutResult;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.result.RangeResult;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.result.Result;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.result.TxnResult;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.coder.Coder;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.kv.KV;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.kv.KVImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.exceptions.InvalidStateStoreException;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.exceptions.MVCCStoreException;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.exceptions.StateStoreRuntimeException;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.kv.KVIterator;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.kv.KVMulti;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.mvcc.MVCCStore;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.Constants;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.kv.RocksdbKVStore;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.mvcc.MVCCRecord;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.mvcc.MVCCRecordCoder;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.rocksdb.RocksUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.proto.kv.store.ValueType;
import org.apache.pulsar.shade.org.apache.commons.lang.mutable.MutableLong;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.shade.org.rocksdb.RocksDBException;
import org.apache.pulsar.shade.org.rocksdb.RocksIterator;
import org.apache.pulsar.shade.org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MVCCStoreImpl<K, V>
extends RocksdbKVStore<K, V>
implements MVCCStore<K, V> {
    private static final Logger log = LoggerFactory.getLogger(MVCCStoreImpl.class);
    private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
    private final ResultFactory<K, V> resultFactory;
    private final KeyValueFactory<K, V> recordFactory;
    private final OpFactory<K, V> opFactory;
    private final Coder<MVCCRecord> recordCoder = MVCCRecordCoder.of();

    MVCCStoreImpl() {
        this.resultFactory = new ResultFactory();
        this.recordFactory = new KeyValueFactory();
        this.opFactory = new OpFactoryImpl();
    }

    @Override
    public OpFactory<K, V> getOpFactory() {
        return this.opFactory;
    }

    @Override
    public void put(K key, V value) {
        throw new UnsupportedOperationException("Please use #put(PutOp op) instead");
    }

    @Override
    public synchronized V putIfAbsent(K key, V value) {
        throw new UnsupportedOperationException("Please use #put(PutOp op) instead");
    }

    @Override
    public synchronized KVMulti<K, V> multi() {
        throw new UnsupportedOperationException("Please use #txn(TxnOp op) instead");
    }

    @Override
    public synchronized V delete(K key) {
        throw new UnsupportedOperationException("Please use #delete(DeleteOp op) instead");
    }

    void increment(K key, long amount, long revision) {
        try (IncrementOp<K, V> op = this.opFactory.newIncrement(key, amount, Options.blindIncrement());
             IncrementResult<K, V> result = this.increment(revision, op);){
            if (Code.OK != result.code()) {
                throw new MVCCStoreException(result.code(), "Failed to increment (" + key + ", " + amount + ") to state store " + this.name);
            }
        }
    }

    void put(K key, V value, long revision) {
        try (PutOp<K, V> op = this.opFactory.newPut(key, value, Options.blindPut());
             PutResult<K, V> result = this.put(revision, op);){
            if (Code.OK != result.code()) {
                throw new MVCCStoreException(result.code(), "Failed to put (" + key + ", " + value + ", " + revision + ") to state store " + this.name);
            }
        }
    }

    void delete(K key, long revision) {
        try (DeleteOp<K, V> op = this.opFactory.newDelete(key, Options.delete());
             DeleteResult<K, V> result = this.delete(revision, op);){
            if (Code.OK != result.code()) {
                throw new MVCCStoreException(result.code(), "Failed to delete key=" + key + "from state store " + this.name);
            }
        }
    }

    void deleteRange(K key, K endKey, long revision) {
        try (DeleteOp<K, V> op = this.opFactory.newDelete(key, (DeleteOption)this.opFactory.optionFactory().newDeleteOption().endKey(endKey).prevKv(false).build());
             DeleteResult<K, V> result = this.delete(revision, op);){
            if (Code.OK != result.code()) {
                throw new MVCCStoreException(result.code(), "Failed to delete key=" + key + "from state store " + this.name);
            }
        }
    }

    /*
     * Exception decompiling
     */
    Long getNumber(K key) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    /*
     * Exception decompiling
     */
    @Override
    public synchronized V get(K key) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [1[TRYBLOCK]], but top level block is 31[SIMPLE_IF_TAKEN]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    @Override
    public synchronized KVIterator<K, V> range(K from, K to) {
        this.checkStoreOpen();
        RangeResultIterator iter = new RangeResultIterator(from, to);
        this.kvIters.add(iter);
        return iter;
    }

    private void executeBatch(WriteBatch batch) {
        try {
            this.db.write(this.writeOpts, batch);
        }
        catch (RocksDBException e) {
            throw new StateStoreRuntimeException("Error while executing a multi operation from state store " + this.name, e);
        }
    }

    @Override
    public IncrementResult<K, V> increment(long revision, IncrementOp<K, V> op) {
        try {
            return this.processIncrement(revision, op);
        }
        catch (MVCCStoreException e) {
            IncrementResultImpl<K, V> result = this.resultFactory.newIncrementResult(revision);
            result.code(e.getCode());
            return result;
        }
        catch (StateStoreRuntimeException e) {
            IncrementResultImpl<K, V> result = this.resultFactory.newIncrementResult(revision);
            result.code(Code.INTERNAL_ERROR);
            return result;
        }
    }

    synchronized IncrementResult<K, V> processIncrement(long revision, IncrementOp<K, V> op) {
        this.checkStoreOpen();
        WriteBatch batch = new WriteBatch();
        IncrementResult<K, V> result = null;
        try {
            result = this.increment(revision, batch, op);
            this.updateLastRevision(batch, revision);
            this.executeBatch(batch);
            IncrementResult<K, V> incrementResult = result;
            return incrementResult;
        }
        catch (StateStoreRuntimeException e) {
            if (null != result) {
                result.close();
            }
            throw e;
        }
        finally {
            RocksUtils.close(batch);
        }
    }

    private IncrementResult<K, V> increment(long revision, WriteBatch batch, IncrementOp<K, V> op) {
        K key = op.key();
        long amount = op.amount();
        byte[] rawKey = this.keyCoder.encode(key);
        MVCCRecord record = this.getKeyRecord(key, rawKey);
        IncrementResultImpl<K, V> result = this.resultFactory.newIncrementResult(revision);
        try {
            long oldAmount = 0L;
            if (null != record) {
                if (record.compareModRev(revision) >= 0) {
                    result.code(Code.SMALLER_REVISION);
                    IncrementResultImpl<K, V> incrementResultImpl = result;
                    return incrementResultImpl;
                }
                if (ValueType.NUMBER != record.getValueType()) {
                    result.code(Code.ILLEGAL_OP);
                    IncrementResultImpl<K, V> incrementResultImpl = result;
                    return incrementResultImpl;
                }
                record.setVersion(record.getVersion() + 1L);
                oldAmount = record.getValue().getLong(0);
            } else {
                record = MVCCRecord.newRecord();
                record.setCreateRev(revision);
                record.setVersion(0L);
                record.setValue(PooledByteBufAllocator.DEFAULT.buffer(8), ValueType.NUMBER);
            }
            long newAmount = oldAmount + amount;
            record.getValue().writerIndex(0);
            record.getValue().writeLong(newAmount);
            record.setModRev(revision);
            batch.put(this.dataCfHandle, rawKey, this.recordCoder.encode(record));
            result.code(Code.OK);
            if (op.option().getTotal()) {
                result.totalAmount(newAmount);
            }
            IncrementResultImpl<K, V> incrementResultImpl = result;
            return incrementResultImpl;
        }
        catch (RocksDBException rde) {
            result.close();
            throw new StateStoreRuntimeException(rde);
        }
        catch (StateStoreRuntimeException e) {
            result.close();
            throw e;
        }
        finally {
            if (null != record) {
                record.recycle();
            }
        }
    }

    @Override
    public PutResult<K, V> put(long revision, PutOp<K, V> op) {
        try {
            return this.processPut(revision, op);
        }
        catch (MVCCStoreException e) {
            PutResultImpl<K, V> result = this.resultFactory.newPutResult(revision);
            result.code(e.getCode());
            return result;
        }
        catch (StateStoreRuntimeException e) {
            PutResultImpl<K, V> result = this.resultFactory.newPutResult(revision);
            result.code(Code.INTERNAL_ERROR);
            return result;
        }
    }

    synchronized PutResult<K, V> processPut(long revision, PutOp<K, V> op) {
        this.checkStoreOpen();
        WriteBatch batch = new WriteBatch();
        PutResult<K, V> result = null;
        try {
            result = this.put(revision, batch, op);
            this.updateLastRevision(batch, revision);
            this.executeBatch(batch);
            PutResult<K, V> putResult = result;
            return putResult;
        }
        catch (StateStoreRuntimeException e) {
            if (null != result) {
                result.close();
            }
            throw e;
        }
        finally {
            RocksUtils.close(batch);
        }
    }

    private PutResult<K, V> put(long revision, WriteBatch batch, PutOp<K, V> op) {
        MVCCRecord record;
        K key = op.key();
        V val = op.value();
        byte[] rawKey = this.keyCoder.encode(key);
        ByteBuf rawValBuf = this.valCoder.encodeBuf(val);
        try {
            record = this.getKeyRecord(key, rawKey);
        }
        catch (StateStoreRuntimeException e) {
            ReferenceCountUtil.safeRelease(rawValBuf);
            throw e;
        }
        PutResultImpl<K, V> result = this.resultFactory.newPutResult(revision);
        MVCCRecord oldRecord = null;
        try {
            AutoCloseable prevKV;
            if (null != record) {
                if (record.compareModRev(revision) >= 0) {
                    result.code(Code.SMALLER_REVISION);
                    PutResultImpl<K, V> putResultImpl = result;
                    return putResultImpl;
                }
                if (ValueType.BYTES != record.getValueType()) {
                    result.code(Code.ILLEGAL_OP);
                    PutResultImpl<K, V> putResultImpl = result;
                    return putResultImpl;
                }
                if (op.option().prevKv()) {
                    oldRecord = record.duplicate();
                }
                record.setVersion(record.getVersion() + 1L);
            } else {
                record = MVCCRecord.newRecord();
                record.setCreateRev(revision);
                record.setVersion(0L);
            }
            record.setValue(rawValBuf, ValueType.BYTES);
            record.setModRev(revision);
            batch.put(this.dataCfHandle, rawKey, this.recordCoder.encode(record));
            result.code(Code.OK);
            if (null != oldRecord) {
                prevKV = oldRecord.asKVRecord(this.recordFactory, key, this.valCoder);
                result.prevKv((KeyValue<K, V>)prevKV);
            }
            prevKV = result;
            return prevKV;
        }
        catch (StateStoreRuntimeException e) {
            result.close();
            throw e;
        }
        catch (RocksDBException e) {
            result.close();
            throw new StateStoreRuntimeException(e);
        }
        finally {
            if (null != record) {
                record.recycle();
            }
            if (null != oldRecord) {
                oldRecord.recycle();
            }
        }
    }

    @Override
    public DeleteResult<K, V> delete(long revision, DeleteOp<K, V> op) {
        try {
            return this.processDelete(revision, op);
        }
        catch (MVCCStoreException e) {
            DeleteResultImpl<K, V> result = this.resultFactory.newDeleteResult(revision);
            result.code(e.getCode());
            return result;
        }
        catch (StateStoreRuntimeException e) {
            DeleteResultImpl<K, V> result = this.resultFactory.newDeleteResult(revision);
            result.code(Code.INTERNAL_ERROR);
            return result;
        }
    }

    synchronized DeleteResult<K, V> processDelete(long revision, DeleteOp<K, V> op) {
        this.checkStoreOpen();
        WriteBatch batch = new WriteBatch();
        DeleteResult<K, V> result = null;
        try {
            result = this.delete(revision, batch, op, true);
            this.updateLastRevision(batch, revision);
            this.executeBatch(batch);
            DeleteResult<K, V> deleteResult = result;
            return deleteResult;
        }
        catch (StateStoreRuntimeException e) {
            if (null != result) {
                result.close();
            }
            throw e;
        }
        finally {
            RocksUtils.close(batch);
        }
    }

    DeleteResult<K, V> delete(long revision, WriteBatch batch, DeleteOp<K, V> op, boolean allowBlind) {
        K key = op.key();
        K endKey = op.option().endKey();
        boolean blind = allowBlind && !op.option().prevKv();
        byte[] rawKey = null != key ? this.keyCoder.encode(key) : Constants.NULL_START_KEY;
        byte[] rawEndKey = null != endKey ? this.keyCoder.encode(endKey) : null;
        DeleteResultImpl<K, V> result = this.resultFactory.newDeleteResult(revision);
        ArrayList<byte[]> keys = Lists.newArrayList();
        ArrayList<MVCCRecord> records = Lists.newArrayList();
        try {
            long numDeleted;
            if (blind) {
                this.deleteBlind(batch, rawKey, rawEndKey);
                numDeleted = 0L;
            } else {
                numDeleted = this.deleteUsingIter(batch, key, rawKey, rawEndKey, keys, records, false);
            }
            List<KeyValue<K, V>> kvs = this.toKvs(keys, records);
            result.code(Code.OK);
            result.prevKvs(kvs);
            result.numDeleted(numDeleted);
        }
        catch (StateStoreRuntimeException e) {
            result.close();
            throw e;
        }
        finally {
            records.forEach(MVCCRecord::recycle);
        }
        return result;
    }

    void deleteBlind(WriteBatch batch, byte[] key, @Nullable byte[] endKey) {
        try {
            if (null == endKey) {
                batch.delete(key);
            } else {
                Pair<byte[], byte[]> realRange = this.getRealRange(key, endKey);
                endKey = realRange.getRight();
                int n = endKey.length - 1;
                endKey[n] = (byte)(endKey[n] + 1);
                batch.deleteRange(realRange.getLeft(), endKey);
            }
        }
        catch (RocksDBException e) {
            throw new StateStoreRuntimeException(e);
        }
    }

    long deleteUsingIter(WriteBatch batch, K key, byte[] rawKey, @Nullable byte[] rawEndKey, List<byte[]> resultKeys, List<MVCCRecord> resultValues, boolean countOnly) {
        MutableLong numKvs = new MutableLong(0L);
        if (null == rawEndKey) {
            MVCCRecord record = this.getKeyRecord(key, rawKey);
            if (null != record) {
                if (!countOnly) {
                    resultKeys.add(rawKey);
                    resultValues.add(record);
                } else {
                    record.recycle();
                }
                numKvs.add(1L);
                try {
                    batch.delete(rawKey);
                }
                catch (RocksDBException e) {
                    throw new StateStoreRuntimeException(e);
                }
            }
        } else {
            Pair<byte[], byte[]> realRange = this.getRealRange(rawKey, rawEndKey);
            rawKey = realRange.getLeft();
            rawEndKey = realRange.getRight();
            this.getKeyRecords(rawKey, rawEndKey, resultKeys, resultValues, numKvs, null, -1L, countOnly);
            this.deleteBlind(batch, rawKey, rawEndKey);
        }
        return numKvs.longValue();
    }

    @Override
    public synchronized TxnResult<K, V> txn(long revision, TxnOp<K, V> op) {
        try {
            return this.processTxn(revision, op);
        }
        catch (MVCCStoreException e) {
            TxnResultImpl<K, V> result = this.resultFactory.newTxnResult(revision);
            result.code(e.getCode());
            return result;
        }
        catch (StateStoreRuntimeException e) {
            TxnResultImpl<K, V> result = this.resultFactory.newTxnResult(revision);
            result.code(Code.INTERNAL_ERROR);
            return result;
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    synchronized TxnResult<K, V> processTxn(long revision, TxnOp<K, V> op) {
        this.checkStoreOpen();
        boolean success = this.processCompares(op);
        List<Op<K, V>> operations = success ? op.successOps() : op.failureOps();
        if (operations == null) {
            operations = Collections.emptyList();
        }
        ArrayList results = Lists.newArrayListWithExpectedSize(operations.size());
        try (WriteBatch batch = new WriteBatch();){
            for (Op<K, V> o : operations) {
                results.add(this.executeOp(revision, batch, o));
            }
            this.updateLastRevision(batch, revision);
            this.executeBatch(batch);
            TxnResultImpl txnResult = this.resultFactory.newTxnResult(revision);
            txnResult.isSuccess(success);
            txnResult.results(results);
            txnResult.code(Code.OK);
            TxnResultImpl txnResultImpl = txnResult;
            return txnResultImpl;
        }
        catch (StateStoreRuntimeException e) {
            results.forEach(Result::close);
            throw e;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean processCompareOp(CompareOp<K, V> op) {
        MVCCRecord record = null;
        K key = op.key();
        byte[] rawKey = this.keyCoder.encode(key);
        try {
            record = this.getKeyRecord(key, rawKey);
            if (null == record && CompareTarget.VALUE != op.target()) {
                throw new MVCCStoreException(Code.KEY_NOT_FOUND, "Key '" + TextFormat.escapeBytes(rawKey) + "' is not found");
            }
            boolean bl = this.processCompareOp(record, op);
            return bl;
        }
        finally {
            if (null != record) {
                record.recycle();
            }
        }
    }

    boolean processCompareOp(@Nullable MVCCRecord record, CompareOp<K, V> op) {
        boolean success;
        int cmp;
        switch (op.target()) {
            case MOD: {
                cmp = record.compareModRev(op.revision());
                break;
            }
            case CREATE: {
                cmp = record.compareCreateRev(op.revision());
                break;
            }
            case VERSION: {
                cmp = record.compareVersion(op.revision());
                break;
            }
            case VALUE: {
                if (null == record) {
                    if (CompareResult.EQUAL == op.result()) {
                        return op.value() == null;
                    }
                    if (CompareResult.NOT_EQUAL == op.result()) {
                        return op.value() != null;
                    }
                    return false;
                }
                if (op.value() != null) {
                    byte[] rawValue = this.valCoder.encode(op.value());
                    cmp = record.getValue().compareTo(Unpooled.wrappedBuffer(rawValue));
                    break;
                }
                switch (op.result()) {
                    case EQUAL: 
                    case LESS: {
                        return false;
                    }
                }
                return true;
            }
            default: {
                return false;
            }
        }
        switch (op.result()) {
            case LESS: {
                success = cmp < 0;
                break;
            }
            case EQUAL: {
                success = cmp == 0;
                break;
            }
            case GREATER: {
                success = cmp > 0;
                break;
            }
            case NOT_EQUAL: {
                success = cmp != 0;
                break;
            }
            default: {
                success = false;
            }
        }
        return success;
    }

    boolean processCompares(TxnOp<K, V> op) {
        for (CompareOp<K, V> compare : op.compareOps()) {
            if (this.processCompareOp(compare)) continue;
            return false;
        }
        return true;
    }

    private Result<K, V> executeOp(long revision, WriteBatch batch, Op<K, V> op) {
        if (op instanceof PutOp) {
            return this.put(revision, batch, (PutOp)op);
        }
        if (op instanceof DeleteOp) {
            return this.delete(revision, batch, (DeleteOp)op, true);
        }
        if (op instanceof RangeOp) {
            return this.range((RangeOp)op);
        }
        throw new MVCCStoreException(Code.ILLEGAL_OP, "Unknown operation in a transaction : " + op);
    }

    private boolean getKeyRecords(byte[] rawKey, byte[] rawEndKey, List<byte[]> resultKeys, List<MVCCRecord> resultValues, MutableLong numKvs, RangeOption<K> rangeOption, long limit, boolean countOnly) {
        try (RocksIterator iter = this.db.newIterator(this.dataCfHandle);){
            iter.seek(rawKey);
            boolean eor = false;
            while (iter.isValid() && (limit < 0L || (long)resultKeys.size() < limit)) {
                byte[] key = iter.key();
                if (COMPARATOR.compare(rawEndKey, key) < 0) {
                    eor = true;
                    break;
                }
                MVCCRecord val = this.recordCoder.decode(iter.value());
                this.processRecord(key, val, resultKeys, resultValues, numKvs, rangeOption, countOnly);
                iter.next();
            }
            if (eor) {
                boolean bl = false;
                return bl;
            }
            boolean bl = iter.isValid();
            return bl;
        }
    }

    private void processRecord(byte[] key, MVCCRecord record, List<byte[]> resultKeys, List<MVCCRecord> resultValues, MutableLong numKvs, RangeOption<K> rangeOption, boolean countOnly) {
        if (null == rangeOption && countOnly) {
            numKvs.increment();
            return;
        }
        if (record.test(rangeOption)) {
            numKvs.increment();
            if (countOnly) {
                record.recycle();
            } else {
                resultKeys.add(key);
                resultValues.add(record);
            }
        } else {
            record.recycle();
        }
    }

    private MVCCRecord getKeyRecord(K key, byte[] keyBytes) {
        try {
            byte[] valBytes = this.db.get(this.dataCfHandle, keyBytes);
            if (null == valBytes) {
                return null;
            }
            return this.recordCoder.decode(valBytes);
        }
        catch (RocksDBException e) {
            throw new StateStoreRuntimeException("Error while getting value for key " + key + " from state store " + this.name, e);
        }
    }

    @Override
    public RangeResult<K, V> range(RangeOp<K, V> rangeOp) {
        try {
            return this.processRange(rangeOp);
        }
        catch (MVCCStoreException e) {
            RangeResultImpl<K, V> result = this.resultFactory.newRangeResult(-1L);
            result.code(e.getCode());
            return result;
        }
        catch (StateStoreRuntimeException e) {
            RangeResultImpl<K, V> result = this.resultFactory.newRangeResult(-1L);
            result.code(Code.INTERNAL_ERROR);
            return result;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    synchronized RangeResult<K, V> processRange(RangeOp<K, V> rangeOp) {
        this.checkStoreOpen();
        K key = rangeOp.key();
        K endKey = rangeOp.option().endKey();
        RangeResultImpl<K, V> result = this.resultFactory.newRangeResult(-1L);
        byte[] rawKey = null != key ? this.keyCoder.encode(key) : Constants.NULL_START_KEY;
        byte[] rawEndKey = Constants.NULL_END_KEY;
        if (null == endKey) {
            MVCCRecord record = this.getKeyRecord(key, rawKey);
            try {
                if (null == record || !record.test(rangeOp.option())) {
                    result.count(0L);
                    result.kvs(Collections.emptyList());
                } else {
                    result.count(1L);
                    result.kvs(Lists.newArrayList(record.asKVRecord(this.recordFactory, key, this.valCoder)));
                }
                result.more(false);
                result.code(Code.OK);
                RangeResultImpl<K, V> rangeResultImpl = result;
                return rangeResultImpl;
            }
            finally {
                if (null != record) {
                    record.recycle();
                }
            }
        }
        rawEndKey = this.keyCoder.encode(endKey);
        Pair<byte[], byte[]> realRange = this.getRealRange(rawKey, rawEndKey);
        rawKey = realRange.getLeft();
        rawEndKey = realRange.getRight();
        ArrayList<byte[]> keys = Lists.newArrayList();
        ArrayList<MVCCRecord> records = Lists.newArrayList();
        MutableLong numKvs = new MutableLong(0L);
        try {
            boolean hasMore = this.getKeyRecords(rawKey, rawEndKey, keys, records, numKvs, rangeOp.option(), rangeOp.option().limit(), false);
            List<KeyValue<K, V>> kvs = this.toKvs(keys, records);
            result.code(Code.OK);
            result.kvs(kvs);
            result.count(kvs.size());
            result.more(hasMore);
        }
        finally {
            records.forEach(MVCCRecord::recycle);
        }
        return result;
    }

    private List<KeyValue<K, V>> toKvs(List<byte[]> keys, List<MVCCRecord> records) {
        ArrayList<KeyValue<K, V>> kvs = Lists.newArrayListWithExpectedSize(keys.size());
        for (int i = 0; i < keys.size(); ++i) {
            byte[] keyBytes = keys.get(i);
            MVCCRecord record = records.get(i);
            kvs.add(record.asKVRecord(this.recordFactory, this.keyCoder.decode(keyBytes), this.valCoder));
        }
        return kvs;
    }

    private Pair<byte[], byte[]> getRealRange(byte[] rawKey, byte[] rawEndKey) {
        boolean isNullStartKey = Constants.isNullStartKey(rawKey);
        boolean isNullEndKey = Constants.isNullEndKey(rawEndKey);
        if (isNullStartKey || isNullEndKey) {
            try (RocksIterator iter = this.db.newIterator(this.dataCfHandle);){
                if (isNullStartKey) {
                    iter.seekToFirst();
                    if (!iter.isValid()) {
                        Pair<byte[], byte[]> pair = null;
                        return pair;
                    }
                    rawKey = iter.key();
                }
                if (isNullEndKey) {
                    iter.seekToLast();
                    if (!iter.isValid()) {
                        Pair<byte[], byte[]> pair = null;
                        return pair;
                    }
                    rawEndKey = iter.key();
                }
            }
        }
        return Pair.of(rawKey, rawEndKey);
    }

    class RangeResultIterator
    implements KVIterator<K, V> {
        private final K to;
        private K next;
        private RangeResult<K, V> result;
        private PeekingIterator<KeyValue<K, V>> resultIter;
        private boolean eor = false;
        private volatile boolean closed = false;

        RangeResultIterator(K from, K to) {
            this.to = to;
            this.next = from;
        }

        private void ensureIteratorOpen() {
            if (this.closed) {
                throw new InvalidStateStoreException("MVCC state store " + MVCCStoreImpl.this.name + " is already closed.");
            }
        }

        @Override
        public void close() {
            MVCCStoreImpl.this.kvIters.remove(this);
            if (null != this.result) {
                this.result.close();
            }
            this.closed = true;
        }

        private void getNextBatch() {
            try (RangeOp op = MVCCStoreImpl.this.opFactory.newRange(this.next, (RangeOption)MVCCStoreImpl.this.opFactory.optionFactory().newRangeOption().endKey(this.to).limit(32L).build());){
                this.result = MVCCStoreImpl.this.range(op);
            }
            if (Code.OK != this.result.code()) {
                throw new MVCCStoreException(this.result.code(), "Failed to fetch kv pairs at range [" + this.next + ", " + this.to + "] from state store " + MVCCStoreImpl.this.name);
            }
            this.resultIter = Iterators.peekingIterator(this.result.kvs().iterator());
        }

        private void skipFirstKey() {
            KeyValue kv;
            while (this.resultIter.hasNext() && (kv = this.resultIter.peek()).key().equals(this.next)) {
                this.resultIter.next();
            }
        }

        @Override
        public boolean hasNext() {
            this.ensureIteratorOpen();
            if (this.eor) {
                return false;
            }
            if (null == this.result) {
                this.getNextBatch();
            }
            if (!this.resultIter.hasNext()) {
                if (this.result.more()) {
                    this.result.close();
                    this.getNextBatch();
                    this.skipFirstKey();
                    return this.hasNext();
                }
                this.eor = true;
                return false;
            }
            return true;
        }

        @Override
        public KV<K, V> next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            KeyValue kv = this.resultIter.next();
            this.next = kv.key();
            if (this.next.equals(this.to)) {
                this.eor = true;
            }
            return new KVImpl(kv.key(), kv.value());
        }
    }
}

