/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.pulsar.functions.runtime.shaded.io.grpc.CallOptions;
import org.apache.pulsar.functions.runtime.shaded.io.grpc.Channel;
import org.apache.pulsar.functions.runtime.shaded.io.grpc.ManagedChannel;
import org.apache.pulsar.functions.runtime.shaded.io.netty.buffer.ByteBuf;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.StorageClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.exceptions.ApiException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.kv.PTable;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.kv.Table;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.SimpleClientBase;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.internal.ProtocolInternalUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.kv.ByteBufTableImpl;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.kv.PByteBufSimpleTableImpl;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.utils.GrpcUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.ExceptionUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.ListenableFutures;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.SharedResourceManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.StorageType;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.StreamProperties;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.protocol.util.ProtoUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleStorageClientImpl
extends SimpleClientBase
implements StorageClient {
    private static final Logger log = LoggerFactory.getLogger(SimpleStorageClientImpl.class);
    private static final String COMPONENT_NAME = SimpleStorageClientImpl.class.getSimpleName();
    private final String defaultNamespace;
    private final RootRangeServiceGrpc.RootRangeServiceFutureStub rootRangeService;

    public SimpleStorageClientImpl(String namespaceName, StorageClientSettings settings) {
        super(settings);
        this.defaultNamespace = namespaceName;
        this.rootRangeService = GrpcUtils.configureGrpcStub(RootRangeServiceGrpc.newFutureStub(this.channel), Optional.empty());
    }

    public SimpleStorageClientImpl(String namespaceName, StorageClientSettings settings, SharedResourceManager.Resource<OrderedScheduler> schedulerResource, ManagedChannel channel) {
        super(settings, schedulerResource, channel, false);
        this.defaultNamespace = namespaceName;
        this.rootRangeService = GrpcUtils.configureGrpcStub(RootRangeServiceGrpc.newFutureStub((Channel)channel), Optional.empty());
    }

    @Override
    public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String tableName) {
        return this.openPTable(this.defaultNamespace, tableName);
    }

    @Override
    public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String namespaceName, String tableName) {
        return ExceptionUtils.callAndHandleClosedAsync(COMPONENT_NAME, this.isClosed(), future -> this.openTableImpl(namespaceName, tableName, future));
    }

    @Override
    public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String table) {
        return this.openTable(this.defaultNamespace, table);
    }

    @Override
    public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String namespaceName, String table) {
        return this.openPTable(namespaceName, table).thenApply(pTable -> new ByteBufTableImpl((PTable<ByteBuf, ByteBuf>)pTable));
    }

    private void openTableImpl(String namespaceName, String streamName, CompletableFuture<PTable<ByteBuf, ByteBuf>> future) {
        CompletionStage getStreamFuture = this.retryUtils.execute(() -> ListenableFutures.fromListenableFuture(this.rootRangeService.getStream(ProtoUtils.createGetStreamRequest(namespaceName, streamName)))).thenCompose(resp -> {
            if (StatusCode.SUCCESS == resp.getCode()) {
                StreamProperties streamProps = resp.getStreamProps();
                log.info("Retrieved table properties for table {}/{} : {}", new Object[]{namespaceName, streamName, streamProps});
                if (StorageType.TABLE != streamProps.getStreamConf().getStorageType()) {
                    return FutureUtils.exception(new ApiException("Can't open a non-table storage entity : " + streamProps.getStreamConf().getStorageType()));
                }
                return FutureUtils.value(streamProps);
            }
            return FutureUtils.exception(ProtocolInternalUtils.createRootRangeException(namespaceName, resp.getCode()));
        });
        FutureUtils.proxyTo(((CompletableFuture)getStreamFuture).thenApply(streamProps -> new PByteBufSimpleTableImpl((StreamProperties)streamProps, (Channel)this.managedChannel, CallOptions.DEFAULT, this.retryUtils)), future);
    }
}

