package io.scalecube.cluster.fdetector;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.IMembershipProtocol;
import io.scalecube.cluster.membership.MemberStatus;
import io.scalecube.transport.ITransport;
import io.scalecube.transport.Message;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.observers.Subscribers;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;

/* loaded from: input_file:io/scalecube/cluster/fdetector/FailureDetector.class */
public final class FailureDetector implements IFailureDetector {
    private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetector.class);
    public static final String PING = "sc/fdetector/ping";
    public static final String PING_REQ = "sc/fdetector/pingReq";
    public static final String PING_ACK = "sc/fdetector/pingAck";
    private final ITransport transport;
    private final IMembershipProtocol membership;
    private final FailureDetectorConfig config;
    private Subscriber<Member> onMemberAddedSubscriber;
    private Subscriber<Member> onMemberRemovedSubscriber;
    private Subscriber<Message> onPingRequestSubscriber;
    private Subscriber<Message> onAskToPingRequestSubscriber;
    private Subscriber<Message> onTransitPingAckRequestSubscriber;
    private final ScheduledExecutorService executor;
    private final Scheduler scheduler;
    private ScheduledFuture<?> pingTask;
    private long period = 0;
    private List<Member> pingMembers = new ArrayList();
    private int pingMemberIndex = 0;
    private Subject<FailureDetectorEvent, FailureDetectorEvent> subject = PublishSubject.create().toSerialized();

    public FailureDetector(ITransport iTransport, IMembershipProtocol iMembershipProtocol, FailureDetectorConfig failureDetectorConfig) {
        Preconditions.checkArgument(iTransport != null);
        Preconditions.checkArgument(iMembershipProtocol != null);
        Preconditions.checkArgument(failureDetectorConfig != null);
        this.transport = iTransport;
        this.membership = iMembershipProtocol;
        this.config = failureDetectorConfig;
        this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("sc-fdetector-" + iTransport.address().toString()).setDaemon(true).build());
        this.scheduler = Schedulers.from(this.executor);
    }

    ITransport getTransport() {
        return this.transport;
    }

    @Override // io.scalecube.cluster.fdetector.IFailureDetector
    public void start() {
        this.onMemberAddedSubscriber = Subscribers.create(this::onMemberAdded);
        this.membership.listen().observeOn(this.scheduler).filter((v0) -> {
            return v0.isAdded();
        }).map((v0) -> {
            return v0.member();
        }).subscribe(this.onMemberAddedSubscriber);
        this.onMemberRemovedSubscriber = Subscribers.create(this::onMemberRemoved);
        this.membership.listen().observeOn(this.scheduler).filter((v0) -> {
            return v0.isRemoved();
        }).map((v0) -> {
            return v0.member();
        }).subscribe(this.onMemberRemovedSubscriber);
        this.onPingRequestSubscriber = Subscribers.create(this::onPing);
        this.transport.listen().observeOn(this.scheduler).filter(this::isPing).subscribe(this.onPingRequestSubscriber);
        this.onAskToPingRequestSubscriber = Subscribers.create(this::onPingReq);
        this.transport.listen().observeOn(this.scheduler).filter(this::isPingReq).subscribe(this.onAskToPingRequestSubscriber);
        this.onTransitPingAckRequestSubscriber = Subscribers.create(this::onTransitPingAck);
        this.transport.listen().observeOn(this.scheduler).filter(this::isTransitPingAck).subscribe(this.onTransitPingAckRequestSubscriber);
        this.pingTask = this.executor.scheduleWithFixedDelay(this::doPing, this.config.getPingInterval(), this.config.getPingInterval(), TimeUnit.MILLISECONDS);
    }

    @Override // io.scalecube.cluster.fdetector.IFailureDetector
    public void stop() {
        if (this.onMemberAddedSubscriber != null) {
            this.onMemberAddedSubscriber.unsubscribe();
        }
        if (this.onMemberRemovedSubscriber != null) {
            this.onMemberRemovedSubscriber.unsubscribe();
        }
        if (this.onPingRequestSubscriber != null) {
            this.onPingRequestSubscriber.unsubscribe();
        }
        if (this.onAskToPingRequestSubscriber != null) {
            this.onAskToPingRequestSubscriber.unsubscribe();
        }
        if (this.onTransitPingAckRequestSubscriber != null) {
            this.onTransitPingAckRequestSubscriber.unsubscribe();
        }
        if (this.pingTask != null) {
            this.pingTask.cancel(true);
        }
        this.executor.shutdown();
        this.subject.onCompleted();
    }

    @Override // io.scalecube.cluster.fdetector.IFailureDetector
    public Observable<FailureDetectorEvent> listen() {
        return this.subject.toSerialized();
    }

    private void doPing() {
        this.period++;
        Member selectPingMember = selectPingMember();
        if (selectPingMember == null) {
            return;
        }
        Member member = this.membership.member();
        String str = member.id() + "-" + Long.toString(this.period);
        Message build = Message.withData(new PingData(member, selectPingMember)).qualifier(PING).correlationId(str).build();
        try {
            LOGGER.trace("Send Ping[{}] to {}", Long.valueOf(this.period), selectPingMember);
            this.transport.listen().observeOn(this.scheduler).filter(this::isPingAck).filter(message -> {
                return Boolean.valueOf(str.equals(message.correlationId()));
            }).take(1).timeout(this.config.getPingTimeout(), TimeUnit.MILLISECONDS, this.scheduler).subscribe(message2 -> {
                LOGGER.trace("Received PingAck[{}] from {}", Long.valueOf(this.period), selectPingMember);
                publishPingResult(selectPingMember, MemberStatus.ALIVE);
            }, th -> {
                LOGGER.trace("Timeout getting PingAck[{}] from {} within {} ms", new Object[]{Long.valueOf(this.period), selectPingMember, Integer.valueOf(this.config.getPingTimeout())});
                doPingReq(selectPingMember, str);
            });
            this.transport.send(selectPingMember.address(), build);
        } catch (Exception e) {
            LOGGER.error("Exception on sending Ping[{}] to {}: {}", new Object[]{Long.valueOf(this.period), selectPingMember, e.getMessage(), e});
        }
    }

    private void doPingReq(Member member, String str) {
        int pingInterval = this.config.getPingInterval() - this.config.getPingTimeout();
        if (pingInterval <= 0) {
            LOGGER.trace("No PingReq[{}] occurred, because no time left (pingInterval={}, pingTimeout={})", new Object[]{Long.valueOf(this.period), Integer.valueOf(this.config.getPingInterval()), Integer.valueOf(this.config.getPingTimeout())});
            publishPingResult(member, MemberStatus.SUSPECT);
            return;
        }
        List<Member> selectPingReqMembers = selectPingReqMembers(member);
        if (selectPingReqMembers.isEmpty()) {
            LOGGER.trace("No PingReq[{}] occurred, because member selection is empty", Long.valueOf(this.period));
            publishPingResult(member, MemberStatus.SUSPECT);
            return;
        }
        Member member2 = this.membership.member();
        this.transport.listen().observeOn(this.scheduler).filter(this::isPingAck).filter(message -> {
            return Boolean.valueOf(str.equals(message.correlationId()));
        }).take(1).timeout(pingInterval, TimeUnit.MILLISECONDS, this.scheduler).subscribe(message2 -> {
            LOGGER.trace("Received transit PingAck[{}] from {} to {}", new Object[]{Long.valueOf(this.period), message2.sender(), member});
            publishPingResult(member, MemberStatus.ALIVE);
        }, th -> {
            LOGGER.trace("Timeout getting transit PingAck[{}] from {} to {} within {} ms", new Object[]{Long.valueOf(this.period), selectPingReqMembers, member, Integer.valueOf(pingInterval)});
            publishPingResult(member, MemberStatus.SUSPECT);
        });
        Message build = Message.withData(new PingData(member2, member)).qualifier(PING_REQ).correlationId(str).build();
        LOGGER.trace("Send PingReq[{}] to {} for {}", new Object[]{Long.valueOf(this.period), selectPingReqMembers, member});
        Iterator<Member> it = selectPingReqMembers.iterator();
        while (it.hasNext()) {
            this.transport.send(it.next().address(), build);
        }
    }

    private void onMemberAdded(Member member) {
        int size = this.pingMembers.size();
        this.pingMembers.add(size > 0 ? ThreadLocalRandom.current().nextInt(size) : 0, member);
    }

    private void onMemberRemoved(Member member) {
        this.pingMembers.remove(member);
    }

    private void onPing(Message message) {
        LOGGER.trace("Received Ping: {}", message);
        PingData pingData = (PingData) message.data();
        if (!pingData.getTo().equals(this.membership.member())) {
            LOGGER.warn("Received Ping to {}, but local member is {}", pingData.getTo(), this.transport.address());
            return;
        }
        Message build = Message.withData(pingData).qualifier(PING_ACK).correlationId(message.correlationId()).build();
        LOGGER.trace("Send PingAck to {}", pingData.getFrom().address());
        this.transport.send(pingData.getFrom().address(), build);
    }

    private void onPingReq(Message message) {
        LOGGER.trace("Received PingReq: {}", message);
        PingData pingData = (PingData) message.data();
        Member to = pingData.getTo();
        Member from = pingData.getFrom();
        Message build = Message.withData(new PingData(this.membership.member(), to, from)).qualifier(PING).correlationId(message.correlationId()).build();
        LOGGER.trace("Send transit Ping to {}", to.address());
        this.transport.send(to.address(), build);
    }

    private void onTransitPingAck(Message message) {
        LOGGER.trace("Received transit PingAck: {}", message);
        PingData pingData = (PingData) message.data();
        Member originalIssuer = pingData.getOriginalIssuer();
        Message build = Message.withData(new PingData(originalIssuer, pingData.getTo())).qualifier(PING_ACK).correlationId(message.correlationId()).build();
        LOGGER.trace("Resend transit PingAck to {}", originalIssuer.address());
        this.transport.send(originalIssuer.address(), build);
    }

    private Member selectPingMember() {
        if (this.pingMembers.isEmpty()) {
            return null;
        }
        if (this.pingMemberIndex >= this.pingMembers.size()) {
            this.pingMemberIndex = 0;
            Collections.shuffle(this.pingMembers);
        }
        List<Member> list = this.pingMembers;
        int i = this.pingMemberIndex;
        this.pingMemberIndex = i + 1;
        return list.get(i);
    }

    private List<Member> selectPingReqMembers(Member member) {
        if (this.config.getPingReqMembers() <= 0) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList(this.pingMembers);
        arrayList.remove(member);
        if (arrayList.isEmpty()) {
            return Collections.emptyList();
        }
        Collections.shuffle(arrayList);
        return arrayList.size() < this.config.getPingReqMembers() ? arrayList : arrayList.subList(0, this.config.getPingReqMembers());
    }

    private void publishPingResult(Member member, MemberStatus memberStatus) {
        LOGGER.debug("Member {} detected as {}", member, memberStatus);
        this.subject.onNext(new FailureDetectorEvent(member, memberStatus));
    }

    private boolean isPing(Message message) {
        return PING.equals(message.qualifier());
    }

    private boolean isPingReq(Message message) {
        return PING_REQ.equals(message.qualifier());
    }

    private boolean isPingAck(Message message) {
        return PING_ACK.equals(message.qualifier()) && ((PingData) message.data()).getOriginalIssuer() == null;
    }

    private boolean isTransitPingAck(Message message) {
        return PING_ACK.equals(message.qualifier()) && ((PingData) message.data()).getOriginalIssuer() != null;
    }
}
