/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.functions.runtime.shaded.io.netty.buffer.ByteBuf;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.StorageClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.exceptions.ApiException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.kv.PTable;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.kv.Table;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.internal.StorageServerClientManagerImpl;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.kv.ByteBufTableImpl;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.kv.PByteBufTableImpl;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.utils.ClientResources;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.AbstractAutoAsyncCloseable;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.ExceptionUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.SharedResourceManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.StorageType;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.StreamProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StorageClientImpl
extends AbstractAutoAsyncCloseable
implements StorageClient {
    private static final Logger log = LoggerFactory.getLogger(StorageClientImpl.class);
    private static final String COMPONENT_NAME = StorageClientImpl.class.getSimpleName();
    private final String defaultNamespace;
    private final StorageClientSettings settings;
    private final ClientResources resources;
    private final OrderedScheduler scheduler;
    private final StorageServerClientManager serverManager;
    private final boolean ownServerManager;

    StorageClientImpl(String namespaceName, StorageClientSettings settings, ClientResources resources) {
        this(namespaceName, settings, resources, new StorageServerClientManagerImpl(settings, resources.scheduler()), true);
    }

    public StorageClientImpl(String namespaceName, StorageClientSettings settings, ClientResources resources, StorageServerClientManager serverManager, boolean ownServerManager) {
        this.defaultNamespace = namespaceName;
        this.settings = settings;
        this.resources = resources;
        this.serverManager = serverManager;
        this.ownServerManager = ownServerManager;
        this.scheduler = SharedResourceManager.shared().get(resources.scheduler());
    }

    CompletableFuture<StreamProperties> getStreamProperties(String namespaceName, String streamName) {
        return this.serverManager.getRootRangeClient().getStream(namespaceName, streamName);
    }

    @Override
    public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String tableName) {
        return this.openPTable(this.defaultNamespace, tableName);
    }

    @Override
    public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String namespaceName, String tableName) {
        return ExceptionUtils.callAndHandleClosedAsync(COMPONENT_NAME, this.isClosed(), future -> this.openTableImpl(namespaceName, tableName, future));
    }

    @Override
    public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String table) {
        return this.openTable(this.defaultNamespace, table);
    }

    @Override
    public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String namespaceName, String table) {
        return this.openPTable(namespaceName, table).thenApply(pTable -> new ByteBufTableImpl((PTable<ByteBuf, ByteBuf>)pTable));
    }

    private void openTableImpl(String namespaceName, String tableName, CompletableFuture<PTable<ByteBuf, ByteBuf>> future) {
        FutureUtils.proxyTo(this.getStreamProperties(namespaceName, tableName).thenComposeAsync(props -> {
            if (log.isInfoEnabled()) {
                log.info("Retrieved table properties for table {}/{} : {}", new Object[]{namespaceName, tableName, props});
            }
            if (StorageType.TABLE != props.getStreamConf().getStorageType()) {
                return FutureUtils.exception(new ApiException("Can't open a non-table storage entity : " + props.getStreamConf().getStorageType()));
            }
            return new PByteBufTableImpl(tableName, (StreamProperties)props, this.serverManager, this.scheduler.chooseThread(props.getStreamId()), this.settings.backoffPolicy()).initialize();
        }), future);
    }

    @Override
    protected void closeAsyncOnce(CompletableFuture<Void> closeFuture) {
        this.scheduler.submit(() -> {
            if (this.ownServerManager) {
                this.serverManager.close();
            }
            closeFuture.complete(null);
            SharedResourceManager.shared().release(this.resources.scheduler(), this.scheduler);
        });
    }

    @Override
    public void close() {
        try {
            super.close(1L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            log.warn("Encountered exceptions on closing the storage client", (Throwable)e);
        }
        this.scheduler.forceShutdown(100L, TimeUnit.MILLISECONDS);
    }
}

