/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.impl.metadata;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.Maps;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.RangeMetadata;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.StreamProperties;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.RelatedRanges;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.RelationType;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.metadata.MetaRangeStore;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.metadata.stream.MetaRange;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.impl.metadata.stream.MetaRangeImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetaRangeStoreImpl
implements MetaRangeStore {
    private static final Logger log = LoggerFactory.getLogger(MetaRangeStoreImpl.class);
    private final MVCCAsyncStore<byte[], byte[]> store;
    private final ScheduledExecutorService executor;
    private final StorageContainerPlacementPolicy rangePlacementPolicy;
    private final Map<Long, MetaRangeImpl> streams;
    private final StorageServerClientManager clientManager;

    public MetaRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> store, StorageContainerPlacementPolicy rangePlacementPolicy, ScheduledExecutorService executor, StorageServerClientManager clientManager) {
        this.store = store;
        this.executor = executor;
        this.rangePlacementPolicy = rangePlacementPolicy;
        this.streams = Maps.newHashMap();
        this.clientManager = clientManager;
    }

    private CompletableFuture<GetActiveRangesResponse> createStreamIfMissing(long streamId, MetaRangeImpl metaRange, StreamProperties streamProps) {
        if (null == streamProps) {
            return FutureUtils.value(GetActiveRangesResponse.newBuilder().setCode(StatusCode.STREAM_NOT_FOUND).build());
        }
        return metaRange.create(streamProps).thenCompose(created -> {
            if (created.booleanValue()) {
                Map<Long, MetaRangeImpl> map = this.streams;
                synchronized (map) {
                    this.streams.put(streamId, metaRange);
                }
                return this.getActiveRanges(metaRange);
            }
            return FutureUtils.value(GetActiveRangesResponse.newBuilder().setCode(StatusCode.INTERNAL_SERVER_ERROR).build());
        });
    }

    @Override
    public CompletableFuture<GetActiveRangesResponse> getActiveRanges(GetActiveRangesRequest request) {
        long streamId = request.getStreamId();
        MetaRangeImpl metaRange = this.streams.get(streamId);
        if (null == metaRange) {
            MetaRangeImpl metaRangeImpl = new MetaRangeImpl(this.store, this.executor, this.rangePlacementPolicy);
            return metaRangeImpl.load(streamId).thenCompose(mr -> {
                if (null == mr) {
                    return this.clientManager.getStreamProperties(streamId).thenCompose(streamProperties -> this.createStreamIfMissing(streamId, metaRangeImpl, (StreamProperties)streamProperties));
                }
                Map<Long, MetaRangeImpl> map = this.streams;
                synchronized (map) {
                    this.streams.put(streamId, (MetaRangeImpl)mr);
                }
                return this.getActiveRanges((MetaRange)mr);
            });
        }
        return this.getActiveRanges(metaRange);
    }

    private CompletableFuture<GetActiveRangesResponse> getActiveRanges(MetaRange metaRange) {
        GetActiveRangesResponse.Builder respBuilder = GetActiveRangesResponse.newBuilder();
        return ((CompletableFuture)metaRange.getActiveRanges().thenApplyAsync(ranges -> {
            for (RangeMetadata range : ranges) {
                RelatedRanges.Builder rrBuilder = RelatedRanges.newBuilder().setProps(range.getProps()).setType(RelationType.PARENTS).addAllRelatedRanges(range.getParentsList());
                respBuilder.addRanges(rrBuilder);
            }
            return respBuilder.setCode(StatusCode.SUCCESS).build();
        }, (Executor)this.executor)).exceptionally(cause -> respBuilder.setCode(StatusCode.INTERNAL_SERVER_ERROR).build());
    }
}

