/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.cluster.fdetector;

import io.scalecube.cluster.CorrelationIdGenerator;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.fdetector.FailureDetector;
import io.scalecube.cluster.fdetector.FailureDetectorConfig;
import io.scalecube.cluster.fdetector.FailureDetectorEvent;
import io.scalecube.cluster.fdetector.PingData;
import io.scalecube.cluster.membership.MemberStatus;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;

public final class FailureDetectorImpl
implements FailureDetector {
    private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetector.class);
    public static final String PING = "sc/fdetector/ping";
    public static final String PING_REQ = "sc/fdetector/pingReq";
    public static final String PING_ACK = "sc/fdetector/pingAck";
    private final Member localMember;
    private final Transport transport;
    private final FailureDetectorConfig config;
    private final CorrelationIdGenerator cidGenerator;
    private long currentPeriod = 0L;
    private int pingMemberIndex = 0;
    private final List<Member> pingMembers = new ArrayList<Member>();
    private final Disposable.Composite actionsDisposables = Disposables.composite();
    private final Sinks.Many<FailureDetectorEvent> sink = Sinks.many().multicast().directBestEffort();
    private final Scheduler scheduler;

    public FailureDetectorImpl(Member localMember, Transport transport, Flux<MembershipEvent> membershipProcessor, FailureDetectorConfig config, Scheduler scheduler, CorrelationIdGenerator cidGenerator) {
        this.localMember = Objects.requireNonNull(localMember);
        this.transport = Objects.requireNonNull(transport);
        this.config = Objects.requireNonNull(config);
        this.scheduler = Objects.requireNonNull(scheduler);
        this.cidGenerator = Objects.requireNonNull(cidGenerator);
        this.actionsDisposables.addAll(Arrays.asList(membershipProcessor.publishOn(scheduler).subscribe(this::onMemberEvent, this::onError), transport.listen().publishOn(scheduler).subscribe(this::onMessage, this::onError)));
    }

    @Override
    public void start() {
        this.actionsDisposables.add(this.scheduler.schedulePeriodically(this::doPing, (long)this.config.pingInterval(), (long)this.config.pingInterval(), TimeUnit.MILLISECONDS));
    }

    @Override
    public void stop() {
        this.actionsDisposables.dispose();
        this.sink.emitComplete((Sinks.EmitFailureHandler)RetryEmitFailureHandler.INSTANCE);
    }

    @Override
    public Flux<FailureDetectorEvent> listen() {
        return this.sink.asFlux().onBackpressureBuffer();
    }

    private void doPing() {
        long period = this.currentPeriod++;
        Member pingMember = this.selectPingMember();
        if (pingMember == null) {
            return;
        }
        String cid = this.cidGenerator.nextCid();
        PingData pingData = new PingData(this.localMember, pingMember);
        Message pingMsg = Message.withData((Object)pingData).qualifier(PING).correlationId(cid).build();
        LOGGER.debug("[{}][{}] Send Ping to {}", new Object[]{this.localMember, period, pingMember});
        Address address = pingMember.address();
        this.transport.requestResponse(address, pingMsg).timeout(Duration.ofMillis(this.config.pingTimeout()), this.scheduler).publishOn(this.scheduler).subscribe(message -> {
            LOGGER.debug("[{}][{}] Received PingAck from {}", new Object[]{this.localMember, period, message.sender()});
            this.publishPingResult(period, pingMember, this.computeMemberStatus((Message)message, period));
        }, ex -> {
            LOGGER.debug("[{}][{}] Failed to get PingAck from {} within {} ms", new Object[]{this.localMember, period, pingMember, this.config.pingTimeout()});
            int timeLeft = this.config.pingInterval() - this.config.pingTimeout();
            List<Member> pingReqMembers = this.selectPingReqMembers(pingMember);
            if (timeLeft <= 0 || pingReqMembers.isEmpty()) {
                LOGGER.debug("[{}][{}] No PingReq occurred", (Object)this.localMember, (Object)period);
                this.publishPingResult(period, pingMember, MemberStatus.SUSPECT);
            } else {
                this.doPingReq(this.currentPeriod, pingMember, pingReqMembers, cid);
            }
        });
    }

    private void doPingReq(long period, Member pingMember, List<Member> pingReqMembers, String cid) {
        Message pingReqMsg = Message.withData((Object)new PingData(this.localMember, pingMember)).qualifier(PING_REQ).correlationId(cid).build();
        LOGGER.debug("[{}][{}] Send PingReq to {} for {}", new Object[]{this.localMember, period, pingReqMembers, pingMember});
        Duration timeout = Duration.ofMillis(this.config.pingInterval() - this.config.pingTimeout());
        pingReqMembers.forEach(member -> this.transport.requestResponse(member.address(), pingReqMsg).timeout(timeout, this.scheduler).publishOn(this.scheduler).subscribe(message -> {
            LOGGER.debug("[{}][{}] Received transit PingAck from {} to {}", new Object[]{this.localMember, period, message.sender(), pingMember});
            this.publishPingResult(period, pingMember, this.computeMemberStatus((Message)message, period));
        }, throwable -> {
            LOGGER.debug("[{}][{}] Timeout getting transit PingAck from {} to {} within {} ms", new Object[]{this.localMember, period, member, pingMember, timeout.toMillis()});
            this.publishPingResult(period, pingMember, MemberStatus.SUSPECT);
        }));
    }

    private void onMessage(Message message) {
        if (this.isPing(message)) {
            this.onPing(message);
        } else if (this.isPingReq(message)) {
            this.onPingReq(message);
        } else if (this.isTransitPingAck(message)) {
            this.onTransitPingAck(message);
        }
    }

    private void onPing(Message message) {
        long period = this.currentPeriod;
        Address sender = message.sender();
        LOGGER.debug("[{}][{}] Received Ping from {}", new Object[]{this.localMember, period, sender});
        PingData data = (PingData)message.data();
        data = data.withAckType(PingData.AckType.DEST_OK);
        if (!data.getTo().id().equals(this.localMember.id())) {
            LOGGER.debug("[{}][{}] Received Ping from {} to {}, but local member is {}", new Object[]{this.localMember, period, sender, data.getTo(), this.localMember});
            data = data.withAckType(PingData.AckType.DEST_GONE);
        }
        String correlationId = message.correlationId();
        Message ackMessage = Message.withData((Object)data).qualifier(PING_ACK).correlationId(correlationId).build();
        Address address = data.getFrom().address();
        LOGGER.debug("[{}][{}] Send PingAck to {}", new Object[]{this.localMember, period, address});
        this.transport.send(address, ackMessage).subscribe(null, ex -> LOGGER.debug("[{}][{}] Failed to send PingAck to {}, cause: {}", new Object[]{this.localMember, period, address, ex.toString()}));
    }

    private void onPingReq(Message message) {
        long period = this.currentPeriod;
        LOGGER.debug("[{}][{}] Received PingReq from {}", new Object[]{this.localMember, period, message.sender()});
        PingData data = (PingData)message.data();
        Member target = data.getTo();
        Member originalIssuer = data.getFrom();
        String correlationId = message.correlationId();
        PingData pingReqData = new PingData(this.localMember, target, originalIssuer);
        Message pingMessage = Message.withData((Object)pingReqData).qualifier(PING).correlationId(correlationId).build();
        Address address = target.address();
        LOGGER.debug("[{}][{}] Send transit Ping to {}", new Object[]{this.localMember, period, address});
        this.transport.send(address, pingMessage).subscribe(null, ex -> LOGGER.debug("[{}][{}] Failed to send transit Ping to {}, cause: {}", new Object[]{this.localMember, period, address, ex.toString()}));
    }

    private void onTransitPingAck(Message message) {
        long period = this.currentPeriod;
        LOGGER.debug("[{}][{}] Received transit PingAck from {}", new Object[]{this.localMember, period, message.sender()});
        PingData data = (PingData)message.data();
        PingData.AckType ackType = data.getAckType();
        Member target = data.getOriginalIssuer();
        String correlationId = message.correlationId();
        PingData originalAckData = new PingData(target, data.getTo()).withAckType(ackType);
        Message originalAckMessage = Message.withData((Object)originalAckData).qualifier(PING_ACK).correlationId(correlationId).build();
        Address address = target.address();
        LOGGER.debug("[{}][{}] Resend transit PingAck to {}", new Object[]{this.localMember, period, address});
        this.transport.send(address, originalAckMessage).subscribe(null, ex -> LOGGER.debug("[{}][{}] Failed to resend transit PingAck to {}, cause: {}", new Object[]{this.localMember, period, address, ex.toString()}));
    }

    private void onError(Throwable throwable) {
        LOGGER.error("[{}][{}] Received unexpected error:", new Object[]{this.localMember, this.currentPeriod, throwable});
    }

    private void onMemberEvent(MembershipEvent event) {
        boolean removed;
        Member member = event.member();
        if (event.isRemoved() && (removed = this.pingMembers.remove(member))) {
            LOGGER.debug("[{}][{}] Removed {} from pingMembers list (size={})", new Object[]{this.localMember, this.currentPeriod, member, this.pingMembers.size()});
        }
        if (event.isAdded()) {
            int size = this.pingMembers.size();
            int index = size > 0 ? ThreadLocalRandom.current().nextInt(size) : 0;
            this.pingMembers.add(index, member);
            LOGGER.debug("[{}][{}] Added {} to pingMembers list (size={})", new Object[]{this.localMember, this.currentPeriod, member, this.pingMembers.size()});
        }
    }

    private Member selectPingMember() {
        if (this.pingMembers.isEmpty()) {
            return null;
        }
        if (this.pingMemberIndex >= this.pingMembers.size()) {
            this.pingMemberIndex = 0;
            Collections.shuffle(this.pingMembers);
        }
        return this.pingMembers.get(this.pingMemberIndex++);
    }

    private List<Member> selectPingReqMembers(Member pingMember) {
        if (this.config.pingReqMembers() <= 0) {
            return Collections.emptyList();
        }
        ArrayList<Member> candidates = new ArrayList<Member>(this.pingMembers);
        candidates.remove(pingMember);
        if (candidates.isEmpty()) {
            return Collections.emptyList();
        }
        Collections.shuffle(candidates);
        boolean selectAll = candidates.size() < this.config.pingReqMembers();
        return selectAll ? candidates : candidates.subList(0, this.config.pingReqMembers());
    }

    private void publishPingResult(long period, Member member, MemberStatus status) {
        LOGGER.debug("[{}][{}] Member {} detected as {}", new Object[]{this.localMember, period, member, status});
        this.sink.emitNext((Object)new FailureDetectorEvent(member, status), (Sinks.EmitFailureHandler)RetryEmitFailureHandler.INSTANCE);
    }

    private MemberStatus computeMemberStatus(Message message, long period) {
        MemberStatus memberStatus;
        PingData data = (PingData)message.data();
        PingData.AckType ackType = data.getAckType();
        if (ackType == null) {
            return MemberStatus.ALIVE;
        }
        switch (ackType) {
            case DEST_OK: {
                memberStatus = MemberStatus.ALIVE;
                break;
            }
            case DEST_GONE: {
                memberStatus = MemberStatus.DEAD;
                break;
            }
            default: {
                LOGGER.warn("[{}][{}] Unknown PingData.AckType received '{}'", new Object[]{this.localMember, period, ackType});
                memberStatus = MemberStatus.SUSPECT;
            }
        }
        return memberStatus;
    }

    private boolean isPing(Message message) {
        return PING.equals(message.qualifier());
    }

    private boolean isPingReq(Message message) {
        return PING_REQ.equals(message.qualifier());
    }

    private boolean isTransitPingAck(Message message) {
        return PING_ACK.equals(message.qualifier()) && ((PingData)message.data()).getOriginalIssuer() != null;
    }

    Transport getTransport() {
        return this.transport;
    }

    private static class RetryEmitFailureHandler
    implements Sinks.EmitFailureHandler {
        private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();

        private RetryEmitFailureHandler() {
        }

        public boolean onEmitFailure(SignalType signalType, Sinks.EmitResult emitResult) {
            return emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED;
        }
    }
}

