/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.protocols.phi;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import io.atomix.event.AbstractListenerManager;
import io.atomix.event.Event;
import io.atomix.protocols.phi.FailureDetectionEvent;
import io.atomix.protocols.phi.FailureDetectionEventListener;
import io.atomix.protocols.phi.FailureDetectionService;
import io.atomix.protocols.phi.PhiAccrualFailureDetector;
import io.atomix.protocols.phi.protocol.FailureDetectionProtocol;
import io.atomix.protocols.phi.protocol.HeartbeatMessage;
import io.atomix.utils.Identifier;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PhiAccrualFailureDetectionService<T extends Identifier>
extends AbstractListenerManager<FailureDetectionEvent<T>, FailureDetectionEventListener<T>>
implements FailureDetectionService<T> {
    private Logger log = LoggerFactory.getLogger(this.getClass());
    private final T localNode;
    private final FailureDetectionProtocol<T> protocol;
    private final Supplier<Collection<T>> peerProvider;
    private final ScheduledFuture<?> heartbeatFuture;
    private final int phiFailureThreshold;
    private final int minSamples;
    private final double phiFactor;
    private final Map<T, PhiAccrualFailureDetector> nodes = Maps.newConcurrentMap();
    private final Map<T, FailureDetectionEvent.State> nodeStates = Maps.newConcurrentMap();

    public static <T extends Identifier> Builder<T> builder() {
        return new Builder();
    }

    public PhiAccrualFailureDetectionService(FailureDetectionProtocol<T> protocol, T localNode, Supplier<Collection<T>> peerProvider, ScheduledExecutorService heartbeatExecutor, Duration heartbeatInterval, int phiFailureThreshold, int minSamples, double phiFactor) {
        Preconditions.checkArgument((phiFailureThreshold > 0 ? 1 : 0) != 0, (Object)"phiFailureThreshold must be positive");
        this.localNode = (Identifier)Preconditions.checkNotNull(localNode, (Object)"localNode cannot be null");
        this.protocol = (FailureDetectionProtocol)Preconditions.checkNotNull(protocol, (Object)"protocol cannot be null");
        this.peerProvider = (Supplier)Preconditions.checkNotNull(peerProvider, (Object)"peerProvider cannot be null");
        this.phiFailureThreshold = phiFailureThreshold;
        this.minSamples = minSamples;
        this.phiFactor = phiFactor;
        this.heartbeatFuture = heartbeatExecutor.scheduleAtFixedRate(this::heartbeat, heartbeatInterval.toMillis(), heartbeatInterval.toMillis(), TimeUnit.MILLISECONDS);
        protocol.registerHeartbeatListener(new HeartbeatMessageHandler());
    }

    private void updateState(T peer, FailureDetectionEvent.State newState) {
        FailureDetectionEvent.State currentState = this.nodeStates.get(peer);
        if (!Objects.equals((Object)currentState, (Object)newState)) {
            this.nodeStates.put(peer, newState);
            this.post((Event)new FailureDetectionEvent<T>(FailureDetectionEvent.Type.STATE_CHANGE, peer, currentState, newState));
        }
    }

    private void heartbeat() {
        try {
            Set<Identifier> peers = this.peerProvider.get().stream().filter(peer -> !peer.equals(this.localNode)).collect(Collectors.toSet());
            FailureDetectionEvent.State state = this.nodeStates.get(this.localNode);
            HeartbeatMessage<T> heartbeat = new HeartbeatMessage<T>(this.localNode, state);
            peers.forEach(node -> {
                this.heartbeatToPeer(heartbeat, node);
                FailureDetectionEvent.State currentState = this.nodeStates.get(node.id());
                double phi = this.nodes.computeIfAbsent(node, n -> new PhiAccrualFailureDetector(this.minSamples, this.phiFactor)).phi();
                if (phi >= (double)this.phiFailureThreshold) {
                    if (currentState == FailureDetectionEvent.State.ACTIVE) {
                        this.updateState(node, FailureDetectionEvent.State.INACTIVE);
                    }
                } else if (currentState == FailureDetectionEvent.State.INACTIVE) {
                    this.updateState(node, FailureDetectionEvent.State.ACTIVE);
                }
            });
        }
        catch (Exception e) {
            this.log.debug("Failed to send heartbeat", (Throwable)e);
        }
    }

    private void heartbeatToPeer(HeartbeatMessage<T> heartbeat, T peer) {
        this.protocol.heartbeat(peer, heartbeat).whenComplete((result, error) -> {
            if (error != null) {
                this.log.trace("Sending heartbeat to {} failed", (Object)peer, error);
            }
        });
    }

    @Override
    public void close() {
        this.protocol.unregisterHeartbeatListener();
        this.heartbeatFuture.cancel(false);
    }

    public static class Builder<T extends Identifier>
    implements FailureDetectionService.Builder<T> {
        private static final Duration DEFAULT_HEARTBEAT_INTERVAL = Duration.ofMillis(100L);
        private static final int DEFAULT_PHI_FAILURE_THRESHOLD = 10;
        private static final int DEFAULT_MIN_SAMPLES = 25;
        private static final double DEFAULT_PHI_FACTOR = 1.0 / Math.log(10.0);
        private FailureDetectionProtocol<T> protocol;
        private T localNode;
        private Supplier<Collection<T>> peerProvider;
        private ScheduledExecutorService heartbeatExecutor;
        private Duration heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
        private int phiFailureThreshold = 10;
        private int minSamples = 25;
        private double phiFactor = DEFAULT_PHI_FACTOR;

        public Builder<T> withProtocol(FailureDetectionProtocol<T> protocol) {
            this.protocol = (FailureDetectionProtocol)Preconditions.checkNotNull(protocol, (Object)"protocol cannot be null");
            return this;
        }

        public Builder<T> withLocalNode(T identifier) {
            this.localNode = identifier;
            return this;
        }

        public Builder<T> withPeerProvider(Supplier<Collection<T>> peerProvider) {
            this.peerProvider = (Supplier)Preconditions.checkNotNull(peerProvider, (Object)"peerProvider cannot be null");
            return this;
        }

        public Builder<T> withHeartbeatExecutor(ScheduledExecutorService executor) {
            this.heartbeatExecutor = (ScheduledExecutorService)Preconditions.checkNotNull((Object)executor, (Object)"executor cannot be null");
            return this;
        }

        public Builder<T> withHeartbeatInterval(Duration interval) {
            this.heartbeatInterval = (Duration)Preconditions.checkNotNull((Object)interval, (Object)"interval cannot be null");
            return this;
        }

        public Builder<T> withPhiFailureThreshold(int failureThreshold) {
            Preconditions.checkArgument((failureThreshold > 0 ? 1 : 0) != 0, (Object)"failureThreshold must be positive");
            this.phiFailureThreshold = failureThreshold;
            return this;
        }

        public Builder<T> withMinSamples(int minSamples) {
            Preconditions.checkArgument((minSamples > 0 ? 1 : 0) != 0, (Object)"minSamples must be positive");
            this.minSamples = minSamples;
            return this;
        }

        public Builder<T> withPhiFactor(double phiFactor) {
            Preconditions.checkArgument((phiFactor > 0.0 ? 1 : 0) != 0, (Object)"phiFactor must be positive");
            this.phiFactor = phiFactor;
            return this;
        }

        public FailureDetectionService<T> build() {
            return new PhiAccrualFailureDetectionService<T>(this.protocol, this.localNode, this.peerProvider, this.heartbeatExecutor, this.heartbeatInterval, this.phiFailureThreshold, this.minSamples, this.phiFactor);
        }
    }

    private class HeartbeatMessageHandler
    implements Consumer<HeartbeatMessage<T>> {
        private HeartbeatMessageHandler() {
        }

        @Override
        public void accept(HeartbeatMessage<T> heartbeat) {
            PhiAccrualFailureDetectionService.this.nodes.computeIfAbsent(heartbeat.source(), n -> new PhiAccrualFailureDetector(PhiAccrualFailureDetectionService.this.minSamples, PhiAccrualFailureDetectionService.this.phiFactor)).report();
            PhiAccrualFailureDetectionService.this.updateState(heartbeat.source(), heartbeat.state());
        }
    }
}

