/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.container;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.Lists;
import org.apache.pulsar.functions.runtime.shaded.javax.annotation.concurrent.GuardedBy;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.exceptions.StorageContainerException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.channel.StorageServerChannelManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.container.StorageContainerInfo;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.internal.api.LocationClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.exceptions.ObjectClosedException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.Revisioned;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointResponse;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.StorageContainerEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StorageContainerChannel {
    private static final Logger log = LoggerFactory.getLogger(StorageContainerChannel.class);
    private final long scId;
    private final StorageServerChannelManager channelManager;
    private final LocationClient locationClient;
    private final ScheduledExecutorService executor;
    @GuardedBy(value="this")
    private StorageContainerInfo scInfo = null;
    @GuardedBy(value="this")
    private CompletableFuture<StorageServerChannel> rsChannelFuture = null;

    public StorageContainerChannel(long scId, StorageServerChannelManager channelManager, LocationClient locationClient, ScheduledExecutorService executor) {
        this.scId = scId;
        this.channelManager = channelManager;
        this.locationClient = locationClient;
        this.executor = executor;
    }

    public long getStorageContainerId() {
        return this.scId;
    }

    public synchronized StorageContainerInfo getStorageContainerInfo() {
        return this.scInfo;
    }

    public synchronized CompletableFuture<StorageServerChannel> getStorageServerChannelFuture() {
        return this.rsChannelFuture;
    }

    public synchronized void resetStorageServerChannelFuture() {
        this.rsChannelFuture = null;
    }

    public synchronized boolean resetStorageServerChannelFuture(CompletableFuture<StorageServerChannel> oldFuture) {
        if (oldFuture != null) {
            if (this.rsChannelFuture == oldFuture) {
                this.rsChannelFuture = null;
                return true;
            }
            return false;
        }
        this.rsChannelFuture = null;
        return true;
    }

    @VisibleForTesting
    public synchronized void setStorageServerChannelFuture(CompletableFuture<StorageServerChannel> rsChannelFuture) {
        this.rsChannelFuture = rsChannelFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<StorageServerChannel> getStorageContainerChannelFuture() {
        CompletableFuture<StorageServerChannel> channelFuture;
        StorageContainerChannel storageContainerChannel = this;
        synchronized (storageContainerChannel) {
            if (null != this.rsChannelFuture) {
                return this.rsChannelFuture;
            }
            this.rsChannelFuture = FutureUtils.createFuture();
            channelFuture = this.rsChannelFuture;
        }
        this.fetchStorageContainerInfo();
        return channelFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fetchStorageContainerInfo() {
        long scRevision;
        StorageContainerChannel storageContainerChannel = this;
        synchronized (storageContainerChannel) {
            scRevision = null == this.scInfo ? -1L : this.scInfo.getRevision();
        }
        Revisioned<Long> groupId = Revisioned.of(this.scId, scRevision);
        this.locationClient.locateStorageContainers(Lists.newArrayList(groupId)).whenCompleteAsync((scEndpoints, cause) -> {
            if (null != cause) {
                this.handleFetchStorageContainerInfoFailure((Throwable)cause);
                return;
            }
            this.handleFetchStorageContainerInfoSuccess((List<OneStorageContainerEndpointResponse>)scEndpoints);
        }, (Executor)this.executor);
    }

    private void handleFetchStorageContainerInfoFailure(Throwable cause) {
        log.info("Failed to fetch info of storage container ({}) - '{}'. Retry in {} ms ...", new Object[]{this.scId, cause.getMessage(), 200});
        this.executor.schedule(() -> this.fetchStorageContainerInfo(), 200L, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleFetchStorageContainerInfoSuccess(List<OneStorageContainerEndpointResponse> storageContainerEndpoints) {
        if (storageContainerEndpoints.size() != 1) {
            this.handleFetchStorageContainerInfoFailure(new Exception("Expected only one storage container endpoint. But found " + storageContainerEndpoints.size() + " storage container endpoints."));
            return;
        }
        OneStorageContainerEndpointResponse response = storageContainerEndpoints.get(0);
        if (StatusCode.SUCCESS != response.getStatusCode()) {
            this.handleFetchStorageContainerInfoFailure(new StorageContainerException(response.getStatusCode(), "fail to fetch location for storage container (" + this.scId + ")"));
            return;
        }
        StorageContainerEndpoint endpoint = response.getEndpoint();
        if (null != this.scInfo && this.scInfo.getRevision() >= endpoint.getRevision()) {
            this.handleFetchStorageContainerInfoFailure(new StorageContainerException(StatusCode.STALE_GROUP_INFO, "Fetched a stale storage container info : current = " + this.scInfo.getRevision() + ", fetched = " + endpoint.getRevision() + ""));
            return;
        }
        ArrayList<Endpoint> readEndpoints = Lists.newArrayListWithExpectedSize(1 + endpoint.getRoEndpointCount());
        readEndpoints.add(endpoint.getRwEndpoint());
        readEndpoints.addAll(endpoint.getRoEndpointList());
        this.scInfo = StorageContainerInfo.of(this.scId, endpoint.getRevision(), endpoint.getRwEndpoint(), readEndpoints);
        StorageServerChannel serverChannel = this.channelManager.getOrCreateChannel(endpoint.getRwEndpoint());
        if (null == serverChannel) {
            log.info("No channel found/created for range server {}. The channel manager must be shutting down. Stop the process of fetching storage container ({}).", (Object)endpoint.getRwEndpoint(), (Object)this.scId);
            StorageContainerChannel storageContainerChannel = this;
            synchronized (storageContainerChannel) {
                this.rsChannelFuture.completeExceptionally(new ObjectClosedException("StorageServerChannelManager is closed"));
            }
            return;
        }
        StorageServerChannel interceptedChannel = serverChannel.intercept(this.scId);
        StorageContainerChannel storageContainerChannel = this;
        synchronized (storageContainerChannel) {
            this.rsChannelFuture.complete(interceptedChannel);
        }
    }
}

