/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.impl.routing;

import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.internal.api.HashStreamRanges;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.routing.RangeRouter;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.router.BytesHashRouter;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.RangeProperties;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.impl.routing.RangeRoutingTable;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RangeRoutingTableImpl
implements RangeRoutingTable {
    private static final Logger log = LoggerFactory.getLogger(RangeRoutingTableImpl.class);
    private final StorageServerClientManager manager;
    private final ConcurrentLongHashMap<RangeRouter<byte[]>> ranges;
    private final ConcurrentLongHashMap<CompletableFuture<RangeRouter<byte[]>>> outstandingUpdates;

    public RangeRoutingTableImpl(StorageServerClientManager manager) {
        this.manager = manager;
        this.ranges = new ConcurrentLongHashMap();
        this.outstandingUpdates = new ConcurrentLongHashMap();
    }

    @VisibleForTesting
    RangeRouter<byte[]> getRangeRouter(long streamId) {
        return this.ranges.get(streamId);
    }

    @Override
    public RangeProperties getRange(long streamId, byte[] routingKey) {
        RangeRouter<byte[]> router = this.ranges.get(streamId);
        if (null == router) {
            this.fetchStreamRanges(streamId);
            return null;
        }
        return router.getRangeProperties(routingKey);
    }

    @VisibleForTesting
    CompletableFuture<RangeRouter<byte[]>> getOutstandingFetchRequest(long streamId) {
        return this.outstandingUpdates.get(streamId);
    }

    private void fetchStreamRanges(long streamId) {
        if (null != this.outstandingUpdates.get(streamId)) {
            return;
        }
        CompletableFuture newFetchFuture = new CompletableFuture();
        if (null != this.outstandingUpdates.put(streamId, newFetchFuture)) {
            return;
        }
        FutureUtils.proxyTo(((CompletableFuture)((CompletableFuture)this.manager.openMetaRangeClient(streamId).thenCompose(metaRangeClient -> metaRangeClient.getActiveDataRanges())).thenApply(hashStreamRanges -> {
            RangeRouter<byte[]> router = new RangeRouter<byte[]>(BytesHashRouter.of());
            router.setRanges((HashStreamRanges)hashStreamRanges);
            return router;
        })).whenComplete((router, cause) -> {
            if (null == cause) {
                this.ranges.put(streamId, (RangeRouter<byte[]>)router);
            }
            this.outstandingUpdates.remove(streamId, newFetchFuture);
        }), newFetchFuture);
    }
}

