package org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.kv;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.buffer.ByteBufUtil;
import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
import org.apache.pulsar.shade.io.netty.util.ReferenceCountUtil;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.coder.Coder;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.statestore.kv.Command;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.StateStoreSpec;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.exceptions.InvalidStateStoreException;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.kv.KVAsyncStore;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.journal.AbstractStateStoreWithJournal;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.journal.CommandProcessor;

/* loaded from: input_file:org/apache/pulsar/shade/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.class */
public class RocksdbKVAsyncStore<K, V> extends AbstractStateStoreWithJournal<RocksdbKVStore<byte[], byte[]>> implements KVAsyncStore<K, V> {
    private static final byte[] CATCHUP_MARKER = new byte[0];
    private Coder<K> keyCoder;
    private Coder<V> valCoder;

    public RocksdbKVAsyncStore(Supplier<RocksdbKVStore<byte[], byte[]>> supplier, Supplier<Namespace> supplier2) {
        super(supplier, supplier2);
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.journal.AbstractStateStoreWithJournal, org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.AsyncStateStore
    public CompletableFuture<Void> init(StateStoreSpec stateStoreSpec) {
        this.keyCoder = (Coder<K>) stateStoreSpec.getKeyCoder();
        this.valCoder = (Coder<V>) stateStoreSpec.getValCoder();
        return super.init(stateStoreSpec);
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.kv.KVAsyncStoreReadView
    public CompletableFuture<V> get(K k) {
        synchronized (this) {
            if (this.isInitialized) {
                return (CompletableFuture<V>) executeReadIO(() -> {
                    byte[] bArr = (byte[]) ((RocksdbKVStore) this.localStore).get(this.keyCoder.encode(k));
                    if (null == bArr) {
                        return null;
                    }
                    return KVUtils.deserialize(this.valCoder, Unpooled.wrappedBuffer(bArr));
                });
            }
            return (CompletableFuture<V>) uninitializedException();
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.kv.KVAsyncStoreWriteView
    public CompletableFuture<Void> put(K k, V v) {
        synchronized (this) {
            if (!this.isInitialized) {
                return uninitializedException();
            }
            byte[] encode = this.keyCoder.encode(k);
            byte[] encode2 = this.valCoder.encode(v);
            return writeCommandReturnTxId(Command.newBuilder().setPutReq(KVUtils.newPutRequest(encode, encode2)).build()).thenApplyAsync(l -> {
                ByteBuf serialize = KVUtils.serialize(encode2, l.longValue());
                try {
                    ((RocksdbKVStore) this.localStore).put(encode, ByteBufUtil.getBytes(serialize), l.longValue());
                    ReferenceCountUtil.safeRelease(serialize);
                    return null;
                } catch (Throwable th) {
                    ReferenceCountUtil.safeRelease(serialize);
                    throw th;
                }
            }, (Executor) this.writeIOScheduler);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.kv.KVAsyncStoreWriteView
    public CompletableFuture<V> putIfAbsent(K k, V v) {
        byte[] encode = this.keyCoder.encode(k);
        byte[] encode2 = this.valCoder.encode(v);
        return (CompletableFuture<V>) writeCommandReturnTxId(Command.newBuilder().setPutIfAbsentReq(KVUtils.newPutIfAbsentRequest(encode, encode2)).build()).thenApplyAsync(l -> {
            ByteBuf serialize = KVUtils.serialize(encode2, l.longValue());
            try {
                byte[] bArr = (byte[]) ((RocksdbKVStore) this.localStore).putIfAbsent(encode, ByteBufUtil.getBytes(serialize), l.longValue());
                if (null == bArr) {
                    return null;
                }
                Object deserialize = KVUtils.deserialize(this.valCoder, Unpooled.wrappedBuffer(bArr));
                ReferenceCountUtil.safeRelease(serialize);
                return deserialize;
            } finally {
                ReferenceCountUtil.safeRelease(serialize);
            }
        }, (Executor) this.writeIOScheduler);
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.kv.KVAsyncStoreWriteView
    public CompletableFuture<V> delete(K k) {
        byte[] encode = this.keyCoder.encode(k);
        return (CompletableFuture<V>) writeCommandReturnTxId(Command.newBuilder().setDelReq(KVUtils.newDeleteRequest(encode)).build()).thenApplyAsync(l -> {
            byte[] bArr = (byte[]) ((RocksdbKVStore) this.localStore).delete(encode, l.longValue());
            if (null == bArr) {
                return null;
            }
            return KVUtils.deserialize(this.valCoder, Unpooled.wrappedBuffer(bArr));
        }, (Executor) this.writeIOScheduler);
    }

    private <AnyT> CompletableFuture<AnyT> uninitializedException() {
        return FutureUtils.exception(new InvalidStateStoreException("State store " + name() + " is not initialized yet."));
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.journal.AbstractStateStoreWithJournal
    protected ByteBuf newCatchupMarker() {
        return Unpooled.wrappedBuffer(CATCHUP_MARKER);
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.journal.AbstractStateStoreWithJournal
    protected CommandProcessor<RocksdbKVStore<byte[], byte[]>> newCommandProcessor() {
        return KVCommandProcessor.of();
    }

    private CompletableFuture<Long> writeCommandReturnTxId(Command command) {
        try {
            return writeCommandBufReturnTxId(KVUtils.newCommandBuf(command));
        } catch (IOException e) {
            return FutureUtils.exception(e);
        }
    }
}
