/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.instance.state;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.functions.api.StateStoreContext;
import org.apache.pulsar.functions.instance.state.DefaultStateStore;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
import org.apache.pulsar.shade.io.netty.util.ReferenceCountUtil;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.Table;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.options.Options;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureUtils;

public class BKStateStoreImpl
implements DefaultStateStore {
    private final String tenant;
    private final String namespace;
    private final String name;
    private final String fqsn;
    private final Table<ByteBuf, ByteBuf> table;

    public BKStateStoreImpl(String tenant, String namespace, String name, Table<ByteBuf, ByteBuf> table) {
        this.tenant = tenant;
        this.namespace = namespace;
        this.name = name;
        this.table = table;
        this.fqsn = FunctionCommon.getFullyQualifiedName(tenant, namespace, name);
    }

    @Override
    public String tenant() {
        return this.tenant;
    }

    @Override
    public String namespace() {
        return this.namespace;
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public String fqsn() {
        return this.fqsn;
    }

    @Override
    public void init(StateStoreContext ctx) {
    }

    @Override
    public void close() {
        this.table.close();
    }

    @Override
    public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
        return this.table.increment(Unpooled.wrappedBuffer(key.getBytes(StandardCharsets.UTF_8)), amount);
    }

    @Override
    public void incrCounter(String key, long amount) {
        try {
            FutureUtils.result(this.incrCounterAsync(key, amount));
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to increment key '" + key + "' by amount '" + amount + "'", e);
        }
    }

    @Override
    public CompletableFuture<Long> getCounterAsync(String key) {
        return this.table.getNumber(Unpooled.wrappedBuffer(key.getBytes(StandardCharsets.UTF_8)));
    }

    @Override
    public long getCounter(String key) {
        try {
            return FutureUtils.result(this.getCounterAsync(key));
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to retrieve counter from key '" + key + "'");
        }
    }

    @Override
    public CompletableFuture<Void> putAsync(String key, ByteBuffer value) {
        if (value != null) {
            value.position(0);
            return this.table.put(Unpooled.wrappedBuffer(key.getBytes(StandardCharsets.UTF_8)), Unpooled.wrappedBuffer(value));
        }
        return this.table.put(Unpooled.wrappedBuffer(key.getBytes(StandardCharsets.UTF_8)), null);
    }

    @Override
    public void put(String key, ByteBuffer value) {
        try {
            FutureUtils.result(this.putAsync(key, value));
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to update the state value for key '" + key + "'");
        }
    }

    @Override
    public CompletableFuture<Void> deleteAsync(String key) {
        return this.table.delete(Unpooled.wrappedBuffer(key.getBytes(StandardCharsets.UTF_8)), (ByteBuf)((Object)Options.delete())).thenApply(ignored -> null);
    }

    @Override
    public void delete(String key) {
        try {
            FutureUtils.result(this.deleteAsync(key));
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to delete the state value for key '" + key + "'");
        }
    }

    @Override
    public CompletableFuture<ByteBuffer> getAsync(String key) {
        return this.table.get(Unpooled.wrappedBuffer(key.getBytes(StandardCharsets.UTF_8))).thenApply(data -> {
            try {
                if (data != null) {
                    ByteBuffer result = ByteBuffer.allocate(data.readableBytes());
                    data.readBytes(result);
                    result.position(0);
                    ByteBuffer byteBuffer = result;
                    return byteBuffer;
                }
                ByteBuffer byteBuffer = null;
                return byteBuffer;
            }
            finally {
                if (data != null) {
                    ReferenceCountUtil.safeRelease(data);
                }
            }
        });
    }

    @Override
    public ByteBuffer get(String key) {
        try {
            return FutureUtils.result(this.getAsync(key));
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to retrieve the state value for key '" + key + "'", e);
        }
    }
}

