/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.impl.metadata.stream;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.base.Preconditions;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.ImmutableList;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.Lists;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.Maps;
import org.apache.pulsar.functions.runtime.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.pulsar.functions.runtime.shaded.javax.annotation.concurrent.GuardedBy;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.kv.op.CompareResult;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.kv.op.Op;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.kv.op.PutOp;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.kv.op.TxnOp;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.Bytes;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.RangeMetadata;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.RangeProperties;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.RangeState;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.StreamConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.StreamMetadata;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.StreamProperties;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.protocol.util.ProtoUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.metadata.stream.MetaRange;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.exceptions.DataRangeNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetaRangeImpl
implements MetaRange {
    private static final Logger log = LoggerFactory.getLogger(MetaRangeImpl.class);
    private static final byte METADATA_SEP = 1;
    private static final byte RANGE_SEP = 2;
    private static final byte END_SEP = -1;
    private final MVCCAsyncStore<byte[], byte[]> store;
    private final ExecutorService executor;
    private final StorageContainerPlacementPolicy placementPolicy;
    @GuardedBy(value="this")
    private long streamId;
    @GuardedBy(value="this")
    private StreamProperties streamProps;
    private StreamMetadata.LifecycleState lifecycleState = StreamMetadata.LifecycleState.UNINIT;
    private StreamMetadata.ServingState servingState = StreamMetadata.ServingState.WRITABLE;
    private long cTime = 0L;
    private long mTime = 0L;
    private long nextRangeId = 1024L;
    private final NavigableMap<Long, RangeMetadata> ranges;
    private final List<Long> currentRanges;
    private long revision;

    public static final byte[] getStreamMetadataKey(long streamId) {
        byte[] metadataKey = new byte[9];
        Bytes.toBytes(streamId, metadataKey, 0);
        metadataKey[8] = 1;
        return metadataKey;
    }

    public static final byte[] getStreamRangeKey(long streamId, long rangeId) {
        byte[] rangeKey = new byte[17];
        Bytes.toBytes(streamId, rangeKey, 0);
        rangeKey[8] = 2;
        Bytes.toBytes(rangeId, rangeKey, 9);
        return rangeKey;
    }

    public static final byte[] getStreamMetadataEndKey(long streamId) {
        byte[] metadataKey = new byte[9];
        Bytes.toBytes(streamId, metadataKey, 0);
        metadataKey[8] = -1;
        return metadataKey;
    }

    public static final boolean isMetadataKey(byte[] key) {
        return key.length == 9 && key[8] == 1;
    }

    static final boolean isStreamRangeKey(byte[] key) {
        return key.length == 17 && key[8] == 2;
    }

    public MetaRangeImpl(MVCCAsyncStore<byte[], byte[]> store, ExecutorService executor, StorageContainerPlacementPolicy placementPolicy) {
        this(store, executor, placementPolicy, Maps.newTreeMap(), Lists.newArrayList(), StreamMetadata.newBuilder().setLifecycleState(StreamMetadata.LifecycleState.UNINIT).setServingState(StreamMetadata.ServingState.WRITABLE).setNextRangeId(1024L).build(), 0L, 0L);
    }

    private MetaRangeImpl(MVCCAsyncStore<byte[], byte[]> store, ExecutorService executor, StorageContainerPlacementPolicy placementPolicy, NavigableMap<Long, RangeMetadata> ranges, List<Long> currentRanges, StreamMetadata meta, long cTime, long mTime) {
        this.store = store;
        this.executor = executor;
        this.placementPolicy = placementPolicy;
        this.ranges = ranges;
        this.currentRanges = currentRanges;
        this.cTime = cTime;
        this.mTime = mTime;
        this.streamProps = meta.getProps();
        this.streamId = this.streamProps.getStreamId();
        this.lifecycleState = meta.getLifecycleState();
        this.servingState = meta.getServingState();
        this.nextRangeId = meta.getNextRangeId();
    }

    @VisibleForTesting
    public long unsafeGetCreationTime() {
        return this.cTime;
    }

    @VisibleForTesting
    public long unsafeGetModificationTime() {
        return this.mTime;
    }

    @VisibleForTesting
    public StreamMetadata.LifecycleState unsafeGetLifecycleState() {
        return this.lifecycleState;
    }

    @VisibleForTesting
    public synchronized StreamProperties unsafeGetStreamProperties() {
        return this.streamProps;
    }

    @VisibleForTesting
    public synchronized long unsafeGetStreamId() {
        return this.streamId;
    }

    @VisibleForTesting
    public NavigableMap<Long, RangeMetadata> unsafeGetRanges() {
        return this.ranges;
    }

    @VisibleForTesting
    private synchronized StreamMetadata toStreamMetadata(StreamMetadata.LifecycleState state) {
        StreamMetadata.Builder builder = StreamMetadata.newBuilder().setProps(this.streamProps).setLifecycleState(state).setServingState(this.servingState).setNextRangeId(this.nextRangeId).setCTime(this.cTime).setMTime(this.mTime).addAllCurrentRanges(this.currentRanges);
        return builder.build();
    }

    private synchronized StreamMetadata toStreamMetadata(StreamMetadata.ServingState state, long mTime) {
        StreamMetadata.Builder builder = StreamMetadata.newBuilder().setProps(this.streamProps).setLifecycleState(this.lifecycleState).setServingState(state).setNextRangeId(this.nextRangeId).setCTime(this.cTime).setMTime(mTime).addAllCurrentRanges(this.currentRanges);
        return builder.build();
    }

    @VisibleForTesting
    public List<Long> unsafeGetCurrentRanges() {
        return this.currentRanges;
    }

    private <T> CompletableFuture<T> checkStreamCreated(Supplier<CompletableFuture<T>> supplier) {
        CompletableFuture future = FutureUtils.createFuture();
        this.executor.submit(() -> {
            try {
                if (!ProtoUtils.isStreamCreated(this.lifecycleState)) {
                    throw new IllegalStateException("Stream isn't created yet.");
                }
                ((CompletableFuture)((CompletableFuture)supplier.get()).thenApplyAsync(value -> future.complete(value), (Executor)this.executor)).exceptionally(cause -> future.completeExceptionally((Throwable)cause));
            }
            catch (Throwable cause2) {
                future.completeExceptionally(cause2);
            }
        });
        return future;
    }

    private void checkLifecycleState(StreamMetadata.LifecycleState expected) {
        Preconditions.checkState(expected == this.lifecycleState, "Unexpected state " + this.lifecycleState + ", expected to be " + expected);
    }

    @Override
    public synchronized String getName() {
        Preconditions.checkState(null != this.streamProps);
        return this.streamProps.getStreamName();
    }

    private <T> CompletableFuture<T> executeTask(Consumer<CompletableFuture<T>> consumer) {
        CompletableFuture executeFuture = FutureUtils.createFuture();
        this.executor.submit(() -> {
            try {
                consumer.accept(executeFuture);
            }
            catch (Throwable cause) {
                executeFuture.completeExceptionally(cause);
            }
        });
        return executeFuture;
    }

    @Override
    public CompletableFuture<Boolean> create(StreamProperties streamProps) {
        return this.executeTask(future -> this.unsafeCreate((CompletableFuture<Boolean>)future, streamProps));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unsafeCreate(CompletableFuture<Boolean> createFuture, StreamProperties streamProps) {
        this.checkLifecycleState(StreamMetadata.LifecycleState.UNINIT);
        this.lifecycleState = StreamMetadata.LifecycleState.CREATING;
        MetaRangeImpl metaRangeImpl = this;
        synchronized (metaRangeImpl) {
            this.streamProps = streamProps;
        }
        this.streamId = streamProps.getStreamId();
        this.cTime = this.mTime = System.currentTimeMillis();
        List<RangeProperties> propertiesList = ProtoUtils.split(this.streamId, streamProps.getStreamConf().getInitialNumRanges(), this.nextRangeId, this.placementPolicy);
        ArrayList<PutOp<byte[], byte[]>> successOps = Lists.newArrayListWithExpectedSize(propertiesList.size() + 1);
        for (RangeProperties props : propertiesList) {
            RangeMetadata meta = RangeMetadata.newBuilder().setProps(props).setCreateTime(this.cTime).setFenceTime(Long.MAX_VALUE).setState(RangeState.RANGE_ACTIVE).addAllParents(Lists.newArrayList()).build();
            this.ranges.put(props.getRangeId(), meta);
            this.currentRanges.add(props.getRangeId());
            successOps.add(this.store.newPut(MetaRangeImpl.getStreamRangeKey(this.streamId, props.getRangeId()), meta.toByteArray()));
        }
        this.nextRangeId += (long)propertiesList.size();
        byte[] streamMetadataKey = MetaRangeImpl.getStreamMetadataKey(this.streamId);
        successOps.add(this.store.newPut(streamMetadataKey, this.toStreamMetadata(StreamMetadata.LifecycleState.CREATED).toByteArray()));
        TxnOp<byte[], byte[]> txn = this.store.newTxn().If(this.store.newCompareValue(CompareResult.EQUAL, streamMetadataKey, null)).Then(successOps.toArray(new Op[successOps.size()])).build();
        if (log.isTraceEnabled()) {
            log.trace("Execute create stream metadata range txn {}", (Object)streamProps);
        }
        ((CompletableFuture)this.store.txn(txn).thenApplyAsync(txnResult -> {
            try {
                if (log.isTraceEnabled()) {
                    log.trace("Create stream metadata range txn result = {}", (Object)txnResult.isSuccess());
                }
                if (txnResult.isSuccess()) {
                    List results = txnResult.results();
                    this.revision = results.get(results.size() - 1).revision();
                    this.lifecycleState = StreamMetadata.LifecycleState.CREATED;
                    createFuture.complete(true);
                } else {
                    createFuture.complete(false);
                }
                Object var3_3 = null;
                return var3_3;
            }
            finally {
                txnResult.close();
            }
        }, (Executor)this.executor)).exceptionally(cause -> {
            createFuture.completeExceptionally((Throwable)cause);
            return null;
        });
    }

    @Override
    public CompletableFuture<MetaRange> load(long streamId) {
        byte[] streamMetadataKey = MetaRangeImpl.getStreamMetadataKey(streamId);
        byte[] streamMetadataEndKey = MetaRangeImpl.getStreamMetadataEndKey(streamId);
        return this.store.range(streamMetadataKey, streamMetadataEndKey).thenApplyAsync(kvs -> {
            if (kvs.isEmpty()) {
                return null;
            }
            this.loadMetadata((List<KeyValue<byte[], byte[]>>)kvs);
            return this;
        }, (Executor)this.executor);
    }

    private void loadMetadata(List<KeyValue<byte[], byte[]>> kvs) {
        for (KeyValue<byte[], byte[]> kv : kvs) {
            long streamId;
            if (MetaRangeImpl.isMetadataKey(kv.key())) {
                this.revision = kv.modifiedRevision();
                streamId = Bytes.toLong(kv.key(), 0);
                this.loadStreamMetadata(streamId, kv.value());
                continue;
            }
            if (!MetaRangeImpl.isStreamRangeKey(kv.key())) continue;
            streamId = Bytes.toLong(kv.key(), 0);
            long rangeId = Bytes.toLong(kv.key(), 9);
            this.loadRangeMetadata(streamId, rangeId, kv.value());
        }
    }

    private synchronized void loadStreamMetadata(long streamId, byte[] streamMetadataBytes) {
        StreamMetadata metadata;
        this.streamId = streamId;
        try {
            metadata = StreamMetadata.parseFrom(streamMetadataBytes);
        }
        catch (InvalidProtocolBufferException e) {
            throw new RuntimeException("Invalid stream metadata of stream " + streamId, e);
        }
        this.streamProps = metadata.getProps();
        this.lifecycleState = metadata.getLifecycleState();
        this.servingState = metadata.getServingState();
        this.currentRanges.clear();
        this.currentRanges.addAll(metadata.getCurrentRangesList());
        this.nextRangeId = metadata.getNextRangeId();
        this.cTime = metadata.getCTime();
        this.mTime = metadata.getMTime();
    }

    private void loadRangeMetadata(long streamId, long rangeId, byte[] rangeMetadataBytes) {
        RangeMetadata metadata;
        Preconditions.checkArgument(this.streamId == streamId);
        Preconditions.checkArgument(rangeId >= 0L);
        try {
            metadata = RangeMetadata.parseFrom(rangeMetadataBytes);
        }
        catch (InvalidProtocolBufferException e) {
            throw new RuntimeException("Invalid range metadata of range (" + streamId + ", " + rangeId + ")", e);
        }
        this.ranges.put(rangeId, metadata);
    }

    @Override
    public CompletableFuture<Boolean> delete(long streamId) {
        byte[] streamMetadataKey = MetaRangeImpl.getStreamMetadataKey(streamId);
        byte[] streamEndKey = MetaRangeImpl.getStreamMetadataEndKey(streamId);
        return this.store.deleteRange(streamMetadataKey, streamEndKey).thenApplyAsync(kvs -> !kvs.isEmpty(), (Executor)this.executor);
    }

    @Override
    public CompletableFuture<StreamMetadata.ServingState> getServingState() {
        return this.checkStreamCreated(() -> FutureUtils.value(this.servingState));
    }

    @Override
    public CompletableFuture<StreamMetadata.ServingState> updateServingState(StreamMetadata.ServingState state) {
        return this.checkStreamCreated(() -> {
            long mTime = System.currentTimeMillis();
            byte[] streamMetadataKey = MetaRangeImpl.getStreamMetadataKey(this.streamId);
            byte[] streamMetadata = this.toStreamMetadata(state, mTime).toByteArray();
            return this.store.rPut(streamMetadataKey, streamMetadata, this.revision).thenApplyAsync(newRev -> {
                this.servingState = state;
                this.mTime = mTime;
                this.revision = newRev;
                return state;
            }, (Executor)this.executor);
        });
    }

    @Override
    public CompletableFuture<StreamConfiguration> getConfiguration() {
        return this.checkStreamCreated(() -> FutureUtils.value(this.unsafeGetStreamProperties().getStreamConf()));
    }

    private RangeMetadata unsafeGetDataRange(long rangeId) throws DataRangeNotFoundException {
        RangeMetadata rangeMeta = (RangeMetadata)this.ranges.get(rangeId);
        if (null == rangeMeta) {
            throw new DataRangeNotFoundException(this.streamId, rangeId);
        }
        return rangeMeta;
    }

    @Override
    public CompletableFuture<List<RangeMetadata>> getActiveRanges() {
        return this.checkStreamCreated(() -> {
            ImmutableList<Long> rangesIds = ImmutableList.copyOf(this.currentRanges);
            ArrayList<RangeMetadata> properties = Lists.newArrayListWithExpectedSize(rangesIds.size());
            Iterator iterator = rangesIds.iterator();
            while (iterator.hasNext()) {
                long rangeId = (Long)iterator.next();
                properties.add(this.unsafeGetDataRange(rangeId));
            }
            return FutureUtils.value(properties);
        });
    }
}

