/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.statelib.impl.kv;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.apache.pulsar.functions.runtime.shaded.io.netty.buffer.ByteBuf;
import org.apache.pulsar.functions.runtime.shaded.io.netty.buffer.ByteBufUtil;
import org.apache.pulsar.functions.runtime.shaded.io.netty.buffer.Unpooled;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.coder.Coder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.statestore.kv.Command;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.statelib.api.StateStoreSpec;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.statelib.api.exceptions.InvalidStateStoreException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.statelib.api.kv.KVAsyncStore;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.statelib.impl.journal.AbstractStateStoreWithJournal;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.statelib.impl.journal.CommandProcessor;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.statelib.impl.kv.KVCommandProcessor;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.statelib.impl.kv.KVUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.statelib.impl.kv.RocksdbKVStore;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.api.namespace.Namespace;

public class RocksdbKVAsyncStore<K, V>
extends AbstractStateStoreWithJournal<RocksdbKVStore<byte[], byte[]>>
implements KVAsyncStore<K, V> {
    private static final byte[] CATCHUP_MARKER = new byte[0];
    private Coder<K> keyCoder;
    private Coder<V> valCoder;

    public RocksdbKVAsyncStore(Supplier<RocksdbKVStore<byte[], byte[]>> localStateStoreSupplier, Supplier<Namespace> namespaceSupplier) {
        super(localStateStoreSupplier, namespaceSupplier);
    }

    @Override
    public CompletableFuture<Void> init(StateStoreSpec spec) {
        this.keyCoder = spec.getKeyCoder();
        this.valCoder = spec.getValCoder();
        return super.init(spec);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<V> get(K key) {
        RocksdbKVAsyncStore rocksdbKVAsyncStore = this;
        synchronized (rocksdbKVAsyncStore) {
            if (!this.isInitialized) {
                return this.uninitializedException();
            }
        }
        return this.executeReadIO(() -> {
            byte[] keyBytes = this.keyCoder.encode(key);
            byte[] valBytes = (byte[])((RocksdbKVStore)this.localStore).get(keyBytes);
            if (null == valBytes) {
                return null;
            }
            return KVUtils.deserialize(this.valCoder, Unpooled.wrappedBuffer(valBytes));
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> put(K key, V value) {
        RocksdbKVAsyncStore rocksdbKVAsyncStore = this;
        synchronized (rocksdbKVAsyncStore) {
            if (!this.isInitialized) {
                return this.uninitializedException();
            }
        }
        byte[] keyBytes = this.keyCoder.encode(key);
        byte[] valBytes = this.valCoder.encode(value);
        Command command = Command.newBuilder().setPutReq(KVUtils.newPutRequest(keyBytes, valBytes)).build();
        return this.writeCommandReturnTxId(command).thenApplyAsync(revision -> {
            ByteBuf serializedBuf = KVUtils.serialize(valBytes, (long)revision);
            try {
                byte[] serializedBytes = ByteBufUtil.getBytes(serializedBuf);
                ((RocksdbKVStore)this.localStore).put(keyBytes, serializedBytes, (long)revision);
            }
            finally {
                serializedBuf.release();
            }
            return null;
        }, (Executor)this.writeIOScheduler);
    }

    @Override
    public CompletableFuture<V> putIfAbsent(K key, V value) {
        byte[] keyBytes = this.keyCoder.encode(key);
        byte[] valBytes = this.valCoder.encode(value);
        Command command = Command.newBuilder().setPutIfAbsentReq(KVUtils.newPutIfAbsentRequest(keyBytes, valBytes)).build();
        return this.writeCommandReturnTxId(command).thenApplyAsync(revision -> {
            ByteBuf serializedBuf = KVUtils.serialize(valBytes, (long)revision);
            try {
                byte[] serializedBytes = ByteBufUtil.getBytes(serializedBuf);
                byte[] prevValue = ((RocksdbKVStore)this.localStore).putIfAbsent(keyBytes, serializedBytes, (long)revision);
                if (null == prevValue) {
                    Object var7_7 = null;
                    return var7_7;
                }
                V v = KVUtils.deserialize(this.valCoder, Unpooled.wrappedBuffer(prevValue));
                return v;
            }
            finally {
                serializedBuf.release();
            }
        }, (Executor)this.writeIOScheduler);
    }

    @Override
    public CompletableFuture<V> delete(K key) {
        byte[] keyBytes = this.keyCoder.encode(key);
        Command command = Command.newBuilder().setDelReq(KVUtils.newDeleteRequest(keyBytes)).build();
        return this.writeCommandReturnTxId(command).thenApplyAsync(revision -> {
            byte[] prevValue = (byte[])((RocksdbKVStore)this.localStore).delete(keyBytes, (long)revision);
            if (null == prevValue) {
                return null;
            }
            return KVUtils.deserialize(this.valCoder, Unpooled.wrappedBuffer(prevValue));
        }, (Executor)this.writeIOScheduler);
    }

    private <AnyT> CompletableFuture<AnyT> uninitializedException() {
        return FutureUtils.exception(new InvalidStateStoreException("State store " + this.name() + " is not initialized yet."));
    }

    @Override
    protected ByteBuf newCatchupMarker() {
        return Unpooled.wrappedBuffer(CATCHUP_MARKER);
    }

    @Override
    protected CommandProcessor<RocksdbKVStore<byte[], byte[]>> newCommandProcessor() {
        return KVCommandProcessor.of();
    }

    private CompletableFuture<Long> writeCommandReturnTxId(Command command) {
        try {
            ByteBuf cmdBuf = KVUtils.newCommandBuf(command);
            return this.writeCommandBufReturnTxId(cmdBuf);
        }
        catch (IOException e) {
            return FutureUtils.exception(e);
        }
    }
}

