package io.hekate.coordinate.internal;

import io.hekate.cluster.ClusterTopology;
import io.hekate.coordinate.CoordinationFuture;
import io.hekate.coordinate.CoordinationHandler;
import io.hekate.coordinate.CoordinationProcess;
import io.hekate.coordinate.internal.CoordinationProtocol;
import io.hekate.core.HekateSupport;
import io.hekate.messaging.Message;
import io.hekate.messaging.MessagingChannel;
import io.hekate.util.StateGuard;
import io.hekate.util.async.AsyncUtils;
import io.hekate.util.async.Waiting;
import io.hekate.util.format.ToString;
import io.hekate.util.format.ToStringIgnore;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/hekate/coordinate/internal/DefaultCoordinationProcess.class */
public class DefaultCoordinationProcess implements CoordinationProcess {
    private static final Logger log;
    private static final boolean DEBUG;
    private final String name;

    @ToStringIgnore
    private final CoordinationHandler handler;

    @ToStringIgnore
    private final ExecutorService async;

    @ToStringIgnore
    private final MessagingChannel<CoordinationProtocol> channel;

    @ToStringIgnore
    private final StateGuard guard = new StateGuard(DefaultCoordinationProcess.class);

    @ToStringIgnore
    private final CoordinationFuture future = new CoordinationFuture();

    @ToStringIgnore
    private final HekateSupport hekate;

    @ToStringIgnore
    private DefaultCoordinatorContext ctx;
    static final /* synthetic */ boolean $assertionsDisabled;

    public DefaultCoordinationProcess(String str, HekateSupport hekateSupport, CoordinationHandler coordinationHandler, ExecutorService executorService, MessagingChannel<CoordinationProtocol> messagingChannel) {
        if (!$assertionsDisabled && str == null) {
            throw new AssertionError("Name is null.");
        }
        if (!$assertionsDisabled && hekateSupport == null) {
            throw new AssertionError("Hekate is null.");
        }
        if (!$assertionsDisabled && coordinationHandler == null) {
            throw new AssertionError("Protocol is null.");
        }
        if (!$assertionsDisabled && executorService == null) {
            throw new AssertionError("Executor service is null.");
        }
        if (!$assertionsDisabled && messagingChannel == null) {
            throw new AssertionError("Messaging channel is null.");
        }
        this.name = str;
        this.hekate = hekateSupport;
        this.handler = coordinationHandler;
        this.async = executorService;
        this.channel = messagingChannel;
    }

    public CompletableFuture<?> initialize() {
        return (CompletableFuture) this.guard.withWriteLock(() -> {
            this.guard.becomeInitialized();
            CompletableFuture completableFuture = new CompletableFuture();
            this.async.execute(() -> {
                try {
                    if (DEBUG) {
                        log.debug("Initializing handler [process={}]", this.name);
                    }
                    this.handler.initialize();
                    completableFuture.complete(null);
                } catch (Error | RuntimeException e) {
                    completableFuture.completeExceptionally(e);
                }
            });
            return completableFuture;
        });
    }

    public Waiting terminate() {
        return (Waiting) this.guard.withWriteLock(() -> {
            if (!this.guard.becomeTerminated()) {
                return Waiting.NO_WAIT;
            }
            cancelCurrentContext();
            this.async.execute(() -> {
                try {
                    if (DEBUG) {
                        log.debug("Terminating handler [process={}]", this.name);
                    }
                    this.handler.terminate();
                } catch (Error | RuntimeException e) {
                    log.error("Got an unexpected runtime error during coordination handler termination [process={}]", this.name, e);
                }
            });
            this.future.cancel(false);
            return AsyncUtils.shutdown(this.async);
        });
    }

    public void processMessage(Message<CoordinationProtocol> message) {
        if (!$assertionsDisabled && message == null) {
            throw new AssertionError("Message is null.");
        }
        this.guard.withReadLock(() -> {
            DefaultCoordinatorContext defaultCoordinatorContext = this.ctx;
            if (this.guard.isInitialized() && defaultCoordinatorContext != null) {
                this.async.execute(() -> {
                    try {
                        defaultCoordinatorContext.processMessage(message);
                    } catch (Error | RuntimeException e) {
                        log.error("Failed to process coordination request [message={}]", message, e);
                        message.reply(CoordinationProtocol.Reject.INSTANCE);
                    }
                });
                return;
            }
            if (DEBUG) {
                log.debug("Rejected coordination request since process is not initialized [message={}]", message.payload());
            }
            message.reply(CoordinationProtocol.Reject.INSTANCE);
        });
    }

    public void processTopologyChange(ClusterTopology clusterTopology) {
        if (!$assertionsDisabled && clusterTopology == null) {
            throw new AssertionError("New topology is null.");
        }
        this.guard.withWriteLockIfInitialized(() -> {
            if (DEBUG) {
                log.debug("Processing topology change [topology={}]", clusterTopology);
            }
            boolean z = true;
            DefaultCoordinatorContext defaultCoordinatorContext = this.ctx;
            if (defaultCoordinatorContext != null) {
                if (defaultCoordinatorContext.topology().equals(clusterTopology)) {
                    z = false;
                } else {
                    cancelCurrentContext();
                }
            }
            if (!z) {
                if (DEBUG) {
                    log.debug("Topology not changed [process={}]", this.name);
                }
            } else {
                DefaultCoordinatorContext defaultCoordinatorContext2 = new DefaultCoordinatorContext(this.name, this.hekate, clusterTopology, this.channel, this.async, this.handler, () -> {
                    this.future.complete(this);
                });
                this.ctx = defaultCoordinatorContext2;
                if (DEBUG) {
                    log.debug("Created new context [context={}]", defaultCoordinatorContext2);
                }
                this.async.execute(() -> {
                    try {
                        defaultCoordinatorContext2.tryCoordinate();
                    } catch (Error | RuntimeException e) {
                        log.error("Got an unexpected runtime error during coordination [process={}]", this.name, e);
                    }
                });
            }
        });
    }

    @Override // io.hekate.coordinate.CoordinationProcess
    public String name() {
        return this.name;
    }

    @Override // io.hekate.coordinate.CoordinationProcess
    public CoordinationFuture future() {
        return this.future.fork();
    }

    @Override // io.hekate.coordinate.CoordinationProcess
    public CoordinationHandler handler() {
        return this.handler;
    }

    private Waiting cancelCurrentContext() {
        if (!$assertionsDisabled && !this.guard.isWriteLocked()) {
            throw new AssertionError("Must hold a write lock.");
        }
        DefaultCoordinatorContext defaultCoordinatorContext = this.ctx;
        if (defaultCoordinatorContext == null) {
            return Waiting.NO_WAIT;
        }
        this.ctx = null;
        defaultCoordinatorContext.cancel();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.async.execute(() -> {
            try {
                try {
                    defaultCoordinatorContext.postCancel();
                    countDownLatch.countDown();
                } catch (Error | RuntimeException e) {
                    log.error("Got an unexpected runtime error during coordination [process={}]", this.name, e);
                    countDownLatch.countDown();
                }
            } catch (Throwable th) {
                countDownLatch.countDown();
                throw th;
            }
        });
        countDownLatch.getClass();
        return countDownLatch::await;
    }

    public String toString() {
        return ToString.format(CoordinationProcess.class, this);
    }

    static {
        $assertionsDisabled = !DefaultCoordinationProcess.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(DefaultCoordinationProcess.class);
        DEBUG = log.isDebugEnabled();
    }
}
