/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.coordination.impl;

import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.databind.type.TypeFactory;
import org.apache.pulsar.functions.runtime.shaded.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LeaderElection;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class LeaderElectionImpl<T>
implements LeaderElection<T> {
    private static final Logger log = LoggerFactory.getLogger(LeaderElectionImpl.class);
    private final String path;
    private final MetadataSerde<T> serde;
    private final MetadataStoreExtended store;
    private final MetadataCache<T> cache;
    private final Consumer<LeaderElectionState> stateChangesListener;
    private LeaderElectionState leaderElectionState;
    private Optional<Long> version = Optional.empty();
    private Optional<T> proposedValue;
    private final ScheduledExecutorService executor;
    private InternalState internalState;
    private static final int LEADER_ELECTION_RETRY_DELAY_SECONDS = 5;

    LeaderElectionImpl(MetadataStoreExtended store, Class<T> clazz, String path, Consumer<LeaderElectionState> stateChangesListener) {
        this.path = path;
        this.serde = new JSONMetadataSerdeSimpleType(TypeFactory.defaultInstance().constructSimpleType(clazz, null));
        this.store = store;
        this.cache = store.getMetadataCache(clazz);
        this.leaderElectionState = LeaderElectionState.NoLeader;
        this.internalState = InternalState.Init;
        this.stateChangesListener = stateChangesListener;
        this.executor = Executors.newScheduledThreadPool(0, new DefaultThreadFactory("leader-election-executor"));
        store.registerListener(this::handlePathNotification);
        store.registerSessionListener(this::handleSessionNotification);
    }

    @Override
    public synchronized CompletableFuture<LeaderElectionState> elect(T proposedValue) {
        if (this.leaderElectionState != LeaderElectionState.NoLeader) {
            return CompletableFuture.completedFuture(this.leaderElectionState);
        }
        this.proposedValue = Optional.of(proposedValue);
        return this.elect();
    }

    private synchronized CompletableFuture<LeaderElectionState> elect() {
        this.internalState = InternalState.ElectionInProgress;
        return ((CompletableFuture)this.store.get(this.path).thenCompose(optLock -> {
            if (optLock.isPresent()) {
                return this.handleExistingLeaderValue((GetResult)optLock.get());
            }
            return this.tryToBecomeLeader();
        })).thenCompose(leaderElectionState -> this.cache.get(this.path).thenApply(__ -> leaderElectionState));
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private synchronized CompletableFuture<LeaderElectionState> handleExistingLeaderValue(GetResult res) {
        T existingValue;
        try {
            existingValue = this.serde.deserialize(res.getValue());
        }
        catch (Throwable t) {
            return FutureUtils.exception(t);
        }
        if (existingValue.equals(this.proposedValue.orElse(null))) {
            if (!res.getStat().isCreatedBySelf()) return this.store.delete(this.path, Optional.of(res.getStat().getVersion())).thenCompose(__ -> this.tryToBecomeLeader());
            this.changeState(LeaderElectionState.Leading);
        } else if (res.getStat().isCreatedBySelf()) {
            return this.store.delete(this.path, Optional.of(res.getStat().getVersion())).thenCompose(__ -> this.tryToBecomeLeader());
        }
        this.changeState(LeaderElectionState.Following);
        return CompletableFuture.completedFuture(LeaderElectionState.Following);
    }

    private synchronized void changeState(LeaderElectionState les) {
        this.internalState = InternalState.LeaderIsPresent;
        if (this.leaderElectionState != les) {
            this.leaderElectionState = les;
            try {
                this.stateChangesListener.accept(this.leaderElectionState);
            }
            catch (Throwable t) {
                log.warn("Exception in state change listener", t);
            }
        }
    }

    private synchronized CompletableFuture<LeaderElectionState> tryToBecomeLeader() {
        byte[] payload;
        try {
            payload = this.serde.serialize(this.proposedValue.get());
        }
        catch (Throwable t) {
            return FutureUtils.exception(t);
        }
        CompletableFuture<LeaderElectionState> result = new CompletableFuture<LeaderElectionState>();
        ((CompletableFuture)this.store.put(this.path, payload, Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)).thenAccept(stat -> {
            LeaderElectionImpl leaderElectionImpl = this;
            synchronized (leaderElectionImpl) {
                if (this.internalState == InternalState.ElectionInProgress) {
                    ((CompletableFuture)this.cache.get(this.path).thenRun(() -> {
                        LeaderElectionImpl leaderElectionImpl = this;
                        synchronized (leaderElectionImpl) {
                            log.info("Acquired leadership on {}", (Object)this.path);
                            this.internalState = InternalState.LeaderIsPresent;
                            if (this.leaderElectionState != LeaderElectionState.Leading) {
                                this.leaderElectionState = LeaderElectionState.Leading;
                                try {
                                    this.stateChangesListener.accept(this.leaderElectionState);
                                }
                                catch (Throwable t) {
                                    log.warn("Exception in state change listener", t);
                                }
                            }
                            result.complete(this.leaderElectionState);
                        }
                    })).exceptionally(ex -> {
                        ((CompletableFuture)this.store.delete(this.path, Optional.of(stat.getVersion())).thenRun(() -> result.completeExceptionally((Throwable)ex))).exceptionally(ex2 -> {
                            result.completeExceptionally((Throwable)ex2);
                            return null;
                        });
                        return null;
                    });
                } else {
                    ((CompletableFuture)this.store.delete(this.path, Optional.of(stat.getVersion())).thenRun(() -> result.completeExceptionally(new MetadataStoreException.AlreadyClosedException("The leader election was already closed")))).exceptionally(ex -> {
                        result.completeExceptionally((Throwable)ex);
                        return null;
                    });
                }
            }
        })).exceptionally(ex -> {
            if (ex.getCause() instanceof MetadataStoreException.BadVersionException) {
                this.cache.invalidate(this.path);
                ((CompletableFuture)this.elect().thenAccept(lse -> result.complete((LeaderElectionState)((Object)lse)))).exceptionally(ex2 -> {
                    result.completeExceptionally((Throwable)ex2);
                    return null;
                });
            } else {
                result.completeExceptionally(ex.getCause());
            }
            return null;
        });
        return result;
    }

    @Override
    public void close() throws Exception {
        try {
            this.asyncClose().join();
        }
        catch (CompletionException e) {
            throw MetadataStoreException.unwrap(e);
        }
    }

    @Override
    public synchronized CompletableFuture<Void> asyncClose() {
        if (this.internalState == InternalState.Closed) {
            return CompletableFuture.completedFuture(null);
        }
        this.internalState = InternalState.Closed;
        this.executor.shutdownNow();
        if (this.leaderElectionState != LeaderElectionState.Leading) {
            return CompletableFuture.completedFuture(null);
        }
        return this.store.delete(this.path, this.version);
    }

    @Override
    public synchronized LeaderElectionState getState() {
        return this.leaderElectionState;
    }

    @Override
    public CompletableFuture<Optional<T>> getLeaderValue() {
        return this.cache.get(this.path);
    }

    @Override
    public Optional<T> getLeaderValueIfPresent() {
        return this.cache.getIfCached(this.path);
    }

    private synchronized void handleSessionNotification(SessionEvent event) {
        if (event == SessionEvent.SessionReestablished) {
            if (this.leaderElectionState == LeaderElectionState.Leading) {
                log.info("Revalidating leadership for {}", (Object)this.path);
            }
            this.elect().thenAccept(les -> log.info("Resynced leadership for {} - State: {}", (Object)this.path, (Object)les));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handlePathNotification(Notification notification) {
        if (!this.path.equals(notification.getPath())) {
            return;
        }
        LeaderElectionImpl leaderElectionImpl = this;
        synchronized (leaderElectionImpl) {
            if (this.internalState != InternalState.LeaderIsPresent) {
                return;
            }
            if (notification.getType() == NotificationType.Deleted) {
                if (this.leaderElectionState == LeaderElectionState.Leading) {
                    log.warn("Leadership released for {}", (Object)this.path);
                }
                this.leaderElectionState = LeaderElectionState.NoLeader;
                if (this.proposedValue.isPresent()) {
                    this.elect().exceptionally(ex -> {
                        log.warn("Leader election for path {} has failed", (Object)this.path, ex);
                        LeaderElectionImpl leaderElectionImpl = this;
                        synchronized (leaderElectionImpl) {
                            try {
                                this.stateChangesListener.accept(this.leaderElectionState);
                            }
                            catch (Throwable t) {
                                log.warn("Exception in state change listener", t);
                            }
                            if (this.internalState != InternalState.Closed) {
                                this.executor.schedule(() -> {
                                    log.info("Retrying Leader election for path {}", (Object)this.path);
                                    this.elect();
                                }, 5L, TimeUnit.SECONDS);
                            }
                        }
                        return null;
                    });
                }
            }
        }
    }

    private static enum InternalState {
        Init,
        ElectionInProgress,
        LeaderIsPresent,
        Closed;

    }
}

