package io.scalecube.cluster.membership;

import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.ClusterMath;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.TransportWrapper;
import io.scalecube.cluster.fdetector.FailureDetector;
import io.scalecube.cluster.fdetector.FailureDetectorConfig;
import io.scalecube.cluster.fdetector.FailureDetectorEvent;
import io.scalecube.cluster.gossip.GossipProtocol;
import io.scalecube.cluster.metadata.MetadataStore;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
import io.scalecube.reactor.RetryNonSerializedEmitFailureHandler;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/scalecube/cluster/membership/MembershipProtocolImpl.class */
public final class MembershipProtocolImpl implements MembershipProtocol {
    private static final Logger LOGGER = LoggerFactory.getLogger(MembershipProtocol.class);
    public static final String SYNC = "sc/membership/sync";
    public static final String SYNC_ACK = "sc/membership/syncAck";
    public static final String MEMBERSHIP_GOSSIP = "sc/membership/gossip";
    private final Member localMember;
    private final Transport transport;
    private final MembershipConfig membershipConfig;
    private final FailureDetectorConfig failureDetectorConfig;
    private final List<Address> seedMembers;
    private final FailureDetector failureDetector;
    private final GossipProtocol gossipProtocol;
    private final MetadataStore metadataStore;
    private final TransportWrapper transportWrapper;
    private final Scheduler scheduler;
    private final Map<String, MembershipRecord> membershipTable = new HashMap();
    private final Map<String, Member> members = new HashMap();
    private final Set<String> aliveEmittedSet = new HashSet();
    private final Sinks.Many<MembershipEvent> sink = Sinks.many().multicast().directBestEffort();
    private final Disposable.Composite actionsDisposables = Disposables.composite();
    private final Disposable.Swap disposable = Disposables.swap();
    private final Map<String, Disposable> suspicionTimeoutTasks = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/scalecube/cluster/membership/MembershipProtocolImpl$MembershipUpdateReason.class */
    public enum MembershipUpdateReason {
        FAILURE_DETECTOR_EVENT,
        MEMBERSHIP_GOSSIP,
        SYNC,
        INITIAL_SYNC,
        SUSPICION_TIMEOUT
    }

    public MembershipProtocolImpl(Member member, Transport transport, FailureDetector failureDetector, GossipProtocol gossipProtocol, MetadataStore metadataStore, ClusterConfig clusterConfig, Scheduler scheduler) {
        this.transport = (Transport) Objects.requireNonNull(transport);
        this.failureDetector = (FailureDetector) Objects.requireNonNull(failureDetector);
        this.gossipProtocol = (GossipProtocol) Objects.requireNonNull(gossipProtocol);
        this.metadataStore = (MetadataStore) Objects.requireNonNull(metadataStore);
        this.localMember = (Member) Objects.requireNonNull(member);
        this.scheduler = (Scheduler) Objects.requireNonNull(scheduler);
        this.membershipConfig = ((ClusterConfig) Objects.requireNonNull(clusterConfig)).membershipConfig();
        this.failureDetectorConfig = ((ClusterConfig) Objects.requireNonNull(clusterConfig)).failureDetectorConfig();
        this.transportWrapper = new TransportWrapper(transport);
        this.seedMembers = cleanUpSeedMembers(this.membershipConfig.seedMembers());
        this.membershipTable.put(member.id(), new MembershipRecord(member, MemberStatus.ALIVE, 0));
        this.members.put(member.id(), member);
        this.actionsDisposables.addAll(Arrays.asList(transport.listen().publishOn(scheduler).subscribe(this::onMessage, th -> {
            LOGGER.error("[{}][onMessage][error] cause:", member, th);
        }), failureDetector.listen().publishOn(scheduler).subscribe(this::onFailureDetectorEvent, th2 -> {
            LOGGER.error("[{}][onFailureDetectorEvent][error] cause:", member, th2);
        }), gossipProtocol.listen().publishOn(scheduler).subscribe(this::onMembershipGossip, th3 -> {
            LOGGER.error("[{}][onMembershipGossip][error] cause:", member, th3);
        })));
    }

    private List<Address> cleanUpSeedMembers(Collection<Address> collection) {
        InetAddress localIpAddress = Address.getLocalIpAddress();
        String hostAddress = localIpAddress.getHostAddress();
        String hostName = localIpAddress.getHostName();
        Address address = this.transport.address();
        Address create = Address.create(hostAddress, address.port());
        Address create2 = Address.create(hostName, address.port());
        return (List) new LinkedHashSet(collection).stream().filter(address2 -> {
            return checkAddressesNotEqual(address2, this.localMember, hostAddress, hostName);
        }).filter(address3 -> {
            return checkAddressesNotEqual(address3, address);
        }).filter(address4 -> {
            return checkAddressesNotEqual(address4, create);
        }).filter(address5 -> {
            return checkAddressesNotEqual(address5, create2);
        }).collect(Collectors.toList());
    }

    private boolean checkAddressesNotEqual(Address address, Member member, String str, String str2) {
        return member.addresses().stream().allMatch(address2 -> {
            return checkAddressesNotEqual(address, address2) && checkAddressesNotEqual(address, Address.create(str, address2.port())) && checkAddressesNotEqual(address, Address.create(str2, address2.port()));
        });
    }

    private boolean checkAddressesNotEqual(Address address, Address address2) {
        if (!address.equals(address2)) {
            return true;
        }
        LOGGER.warn("[{}] Filtering out seed address: {}", this.localMember, address);
        return false;
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public Flux<MembershipEvent> listen() {
        return this.sink.asFlux().onBackpressureBuffer();
    }

    public Mono<Void> updateIncarnation() {
        return Mono.defer(() -> {
            MembershipRecord membershipRecord = new MembershipRecord(this.localMember, MemberStatus.ALIVE, this.membershipTable.get(this.localMember.id()).incarnation() + 1);
            this.membershipTable.put(this.localMember.id(), membershipRecord);
            return spreadMembershipGossip(membershipRecord);
        });
    }

    public Mono<Void> leaveCluster() {
        return Mono.defer(() -> {
            MembershipRecord membershipRecord = new MembershipRecord(this.localMember, MemberStatus.LEAVING, this.membershipTable.get(this.localMember.id()).incarnation() + 1);
            this.membershipTable.put(this.localMember.id(), membershipRecord);
            return spreadMembershipGossip(membershipRecord);
        });
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public Mono<Void> start() {
        return Mono.create(this::start0).then();
    }

    private void start0(MonoSink<Object> monoSink) {
        if (this.seedMembers.isEmpty()) {
            schedulePeriodicSync();
            monoSink.success();
        } else {
            LOGGER.info("[{}] Making initial Sync to all seed members: {}", this.localMember, this.seedMembers);
            Mono[] monoArr = (Mono[]) this.seedMembers.stream().map(address -> {
                return this.transport.requestResponse(address, prepareSyncDataMsg(SYNC, UUID.randomUUID().toString())).doOnError(th -> {
                    LOGGER.warn("[{}] Exception on initial Sync, cause: {}", this.localMember, th.toString());
                }).onErrorResume(Exception.class, exc -> {
                    return Mono.empty();
                });
            }).toArray(i -> {
                return new Mono[i];
            });
            Flux.mergeDelayError(monoArr.length, monoArr).take(monoArr.length).timeout(Duration.ofMillis(this.membershipConfig.syncTimeout()), this.scheduler).publishOn(this.scheduler).flatMap(message -> {
                return onSyncAck(message, true);
            }).doFinally(signalType -> {
                schedulePeriodicSync();
                monoSink.success();
            }).subscribe((Consumer) null, th -> {
                LOGGER.warn("[{}] Exception on initial SyncAck, cause: {}", this.localMember, th.toString());
            });
        }
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public void stop() {
        this.actionsDisposables.dispose();
        this.disposable.dispose();
        Iterator<String> it = this.suspicionTimeoutTasks.keySet().iterator();
        while (it.hasNext()) {
            Disposable disposable = this.suspicionTimeoutTasks.get(it.next());
            if (disposable != null && !disposable.isDisposed()) {
                disposable.dispose();
            }
        }
        this.suspicionTimeoutTasks.clear();
        this.sink.emitComplete(RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED);
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public Collection<Member> members() {
        return new ArrayList(this.members.values());
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public Collection<Member> otherMembers() {
        return (Collection) new ArrayList(this.members.values()).stream().filter(member -> {
            return !member.equals(this.localMember);
        }).collect(Collectors.toList());
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public Member member() {
        return this.localMember;
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public Optional<Member> member(String str) {
        return Optional.ofNullable(this.members.get(str));
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public Optional<Member> member(Address address) {
        return this.members.values().stream().filter(member -> {
            Stream stream = member.addresses().stream();
            address.getClass();
            return stream.anyMatch((v1) -> {
                return r1.equals(v1);
            });
        }).findFirst();
    }

    private void doSync() {
        List<Address> selectSyncAddress = selectSyncAddress();
        if (selectSyncAddress.isEmpty()) {
            return;
        }
        Message prepareSyncDataMsg = prepareSyncDataMsg(SYNC, null);
        LOGGER.debug("[{}][doSync] Send Sync to {}", this.localMember, selectSyncAddress);
        send(this.transport, selectSyncAddress, prepareSyncDataMsg).subscribe((Consumer) null, th -> {
            LOGGER.debug("[{}][doSync] Failed to send Sync to {}, cause: {}", new Object[]{this.localMember, selectSyncAddress, th.toString()});
        });
    }

    private void onMessage(Message message) {
        if (isSync(message)) {
            onSync(message).subscribe((Consumer) null, th -> {
                LOGGER.error("[{}][onSync][error] cause:", this.localMember, th);
            });
        } else if (isSyncAck(message) && message.correlationId() == null) {
            onSyncAck(message, false).subscribe((Consumer) null, th2 -> {
                LOGGER.error("[{}][onSyncAck][error] cause:", this.localMember, th2);
            });
        }
    }

    private boolean isSync(Message message) {
        return SYNC.equals(message.qualifier());
    }

    private boolean isSyncAck(Message message) {
        return SYNC_ACK.equals(message.qualifier());
    }

    private Mono<Void> onSyncAck(Message message, boolean z) {
        return Mono.defer(() -> {
            LOGGER.debug("[{}] Received SyncAck from {}", this.localMember, message.sender());
            return syncMembership((SyncData) message.data(), z);
        });
    }

    private Mono<Void> onSync(Message message) {
        return Mono.defer(() -> {
            Member member = (Member) message.sender();
            LOGGER.debug("[{}] Received Sync from {}", this.localMember, member);
            return syncMembership((SyncData) message.data(), false).doOnSuccess(r8 -> {
                this.transportWrapper.send(member, prepareSyncDataMsg(SYNC_ACK, message.correlationId())).subscribe((Consumer) null, th -> {
                    LOGGER.debug("[{}] Failed to send SyncAck to {}, cause: {}", new Object[]{this.localMember, member, th.toString()});
                });
            });
        });
    }

    private void onFailureDetectorEvent(FailureDetectorEvent failureDetectorEvent) {
        Member member = failureDetectorEvent.member();
        List addresses = member.addresses();
        MembershipRecord membershipRecord = this.membershipTable.get(member.id());
        if (membershipRecord == null || membershipRecord.status() == failureDetectorEvent.status()) {
            return;
        }
        LOGGER.debug("[{}][onFailureDetectorEvent] Received status change: {}", this.localMember, failureDetectorEvent);
        if (failureDetectorEvent.status() != MemberStatus.ALIVE) {
            updateMembership(new MembershipRecord(membershipRecord.member(), failureDetectorEvent.status(), membershipRecord.incarnation()), MembershipUpdateReason.FAILURE_DETECTOR_EVENT).subscribe((Consumer) null, th -> {
                LOGGER.error("[{}][onFailureDetectorEvent][updateMembership][error] cause:", this.localMember, th);
            });
        } else {
            this.transportWrapper.send(member, prepareSyncDataMsg(SYNC, null)).subscribe((Consumer) null, th2 -> {
                LOGGER.debug("[{}][onFailureDetectorEvent] Failed to send Sync to {}, cause: {}", new Object[]{this.localMember, addresses, th2.toString()});
            });
        }
    }

    private void onMembershipGossip(Message message) {
        if (MEMBERSHIP_GOSSIP.equals(message.qualifier())) {
            MembershipRecord membershipRecord = (MembershipRecord) message.data();
            LOGGER.debug("[{}] Received membership gossip: {}", this.localMember, membershipRecord);
            updateMembership(membershipRecord, MembershipUpdateReason.MEMBERSHIP_GOSSIP).subscribe((Consumer) null, th -> {
                LOGGER.error("[{}][onMembershipGossip][updateMembership][error] cause:", this.localMember, th);
            });
        }
    }

    private List<Address> selectSyncAddress() {
        Collection<Member> otherMembers = otherMembers();
        if (this.seedMembers.isEmpty() && otherMembers.isEmpty()) {
            return Collections.emptyList();
        }
        int nextInt = ThreadLocalRandom.current().nextInt(this.seedMembers.size() + otherMembers.size());
        return nextInt < this.seedMembers.size() ? Collections.singletonList(this.seedMembers.get(nextInt)) : ((Member) new ArrayList(otherMembers).get(nextInt - this.seedMembers.size())).addresses();
    }

    private void schedulePeriodicSync() {
        int syncInterval = this.membershipConfig.syncInterval();
        this.disposable.update(this.scheduler.schedulePeriodically(this::doSync, syncInterval, syncInterval, TimeUnit.MILLISECONDS));
    }

    private Message prepareSyncDataMsg(String str, String str2) {
        return Message.builder().sender(this.localMember).data(new SyncData(new ArrayList(this.membershipTable.values()))).qualifier(str).correlationId(str2).build();
    }

    private Mono<Void> syncMembership(SyncData syncData, boolean z) {
        return Mono.defer(() -> {
            MembershipUpdateReason membershipUpdateReason = z ? MembershipUpdateReason.INITIAL_SYNC : MembershipUpdateReason.SYNC;
            Mono[] monoArr = (Mono[]) syncData.getMembership().stream().map(membershipRecord -> {
                return updateMembership(membershipRecord, membershipUpdateReason).doOnError(th -> {
                    LOGGER.error("[{}][syncMembership][{}][error] cause: {}", new Object[]{this.localMember, membershipUpdateReason, th});
                }).onErrorResume(th2 -> {
                    return Mono.empty();
                });
            }).toArray(i -> {
                return new Mono[i];
            });
            return Flux.mergeDelayError(monoArr.length, monoArr).then();
        });
    }

    private static boolean areNamespacesRelated(String str, String str2) {
        Path path = Paths.get(str, new String[0]);
        Path path2 = Paths.get(str2, new String[0]);
        if (path.compareTo(path2) == 0) {
            return true;
        }
        int nameCount = path.getNameCount();
        int nameCount2 = path2.getNameCount();
        if (nameCount == nameCount2) {
            return false;
        }
        Path path3 = nameCount < nameCount2 ? path : path2;
        Path path4 = nameCount < nameCount2 ? path2 : path;
        boolean z = true;
        int i = 0;
        while (true) {
            if (i >= path3.getNameCount()) {
                break;
            }
            if (!path3.getName(i).equals(path4.getName(i))) {
                z = false;
                break;
            }
            i++;
        }
        return z;
    }

    private Mono<Void> updateMembership(MembershipRecord membershipRecord, MembershipUpdateReason membershipUpdateReason) {
        return Mono.defer(() -> {
            Objects.requireNonNull(membershipRecord, "Membership record can't be null");
            String namespace = this.membershipConfig.namespace();
            String namespace2 = membershipRecord.member().namespace();
            if (!areNamespacesRelated(namespace, namespace2)) {
                LOGGER.debug("[{}][updateMembership][{}] Skipping update, namespace not matched, local: {}, inbound: {}", new Object[]{this.localMember, membershipUpdateReason, namespace, namespace2});
                return Mono.empty();
            }
            MembershipRecord membershipRecord2 = this.membershipTable.get(membershipRecord.member().id());
            if ((membershipRecord2 == null || !membershipRecord2.isLeaving()) && !membershipRecord.isOverrides(membershipRecord2)) {
                LOGGER.debug("[{}][updateMembership][{}] Skipping update, can't override r0: {} with received r1: {}", new Object[]{this.localMember, membershipUpdateReason, membershipRecord2, membershipRecord});
                return Mono.empty();
            }
            if (membershipRecord.member().addresses().equals(this.localMember.addresses())) {
                return membershipRecord.member().id().equals(this.localMember.id()) ? onSelfMemberDetected(membershipRecord2, membershipRecord, membershipUpdateReason) : Mono.empty();
            }
            if (membershipRecord.isLeaving()) {
                return onLeavingDetected(membershipRecord2, membershipRecord);
            }
            if (membershipRecord.isDead()) {
                return onDeadMemberDetected(membershipRecord);
            }
            if (membershipRecord.isSuspect()) {
                if (membershipRecord2 == null || !membershipRecord2.isLeaving()) {
                    this.membershipTable.put(membershipRecord.member().id(), membershipRecord);
                }
                scheduleSuspicionTimeoutTask(membershipRecord);
                spreadMembershipGossipUnlessGossiped(membershipRecord, membershipUpdateReason);
            }
            if (membershipRecord.isAlive()) {
                if (membershipRecord2 != null && membershipRecord2.isLeaving()) {
                    return onAliveAfterLeaving(membershipRecord);
                }
                if (membershipRecord2 == null || membershipRecord2.incarnation() < membershipRecord.incarnation()) {
                    return this.metadataStore.fetchMetadata(membershipRecord.member()).doOnError(th -> {
                        LOGGER.warn("[{}][updateMembership][{}] Skipping to add/update member: {}, due to failed fetchMetadata call (cause: {})", new Object[]{this.localMember, membershipUpdateReason, membershipRecord, th.toString()});
                    }).doOnSuccess(byteBuffer -> {
                        cancelSuspicionTimeoutTask(membershipRecord.member().id());
                        spreadMembershipGossipUnlessGossiped(membershipRecord, membershipUpdateReason);
                        onAliveMemberDetected(membershipRecord, this.metadataStore.updateMetadata(membershipRecord.member(), byteBuffer), byteBuffer);
                    }).onErrorResume(Exception.class, exc -> {
                        return Mono.empty();
                    }).then();
                }
            }
            return Mono.empty();
        });
    }

    private Mono<Void> onAliveAfterLeaving(MembershipRecord membershipRecord) {
        Member member = membershipRecord.member();
        String id = member.id();
        this.members.put(id, member);
        if (this.aliveEmittedSet.add(id)) {
            long currentTimeMillis = System.currentTimeMillis();
            publishEvent(MembershipEvent.createAdded(member, (ByteBuffer) null, currentTimeMillis));
            publishEvent(MembershipEvent.createLeaving(member, (ByteBuffer) null, currentTimeMillis));
        }
        return Mono.empty();
    }

    private Mono<Void> onSelfMemberDetected(MembershipRecord membershipRecord, MembershipRecord membershipRecord2, MembershipUpdateReason membershipUpdateReason) {
        return Mono.fromRunnable(() -> {
            MembershipRecord membershipRecord3 = new MembershipRecord(this.localMember, membershipRecord.status(), Math.max(membershipRecord.incarnation(), membershipRecord2.incarnation()) + 1);
            this.membershipTable.put(this.localMember.id(), membershipRecord3);
            LOGGER.debug("[{}][updateMembership][{}] Updating incarnation, local record r0: {} to received r1: {}, spreading with increased incarnation r2: {}", new Object[]{this.localMember, membershipUpdateReason, membershipRecord, membershipRecord2, membershipRecord3});
            spreadMembershipGossip(membershipRecord3).subscribe((Consumer) null, th -> {
            });
        });
    }

    private Mono<Void> onLeavingDetected(MembershipRecord membershipRecord, MembershipRecord membershipRecord2) {
        return Mono.defer(() -> {
            Member member = membershipRecord2.member();
            String id = member.id();
            this.membershipTable.put(id, membershipRecord2);
            if (membershipRecord != null && (membershipRecord.isAlive() || (membershipRecord.isSuspect() && this.aliveEmittedSet.contains(id)))) {
                publishEvent(MembershipEvent.createLeaving(member, (ByteBuffer) this.metadataStore.metadata(member).orElse(null), System.currentTimeMillis()));
            }
            if (membershipRecord != null && membershipRecord.isLeaving()) {
                return Mono.empty();
            }
            scheduleSuspicionTimeoutTask(membershipRecord2);
            return spreadMembershipGossip(membershipRecord2);
        });
    }

    private void publishEvent(MembershipEvent membershipEvent) {
        LOGGER.info("[{}][publishEvent] {}", this.localMember, membershipEvent);
        this.sink.emitNext(membershipEvent, RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED);
    }

    private Mono<Void> onDeadMemberDetected(MembershipRecord membershipRecord) {
        return Mono.fromRunnable(() -> {
            Member member = membershipRecord.member();
            cancelSuspicionTimeoutTask(member.id());
            if (this.members.containsKey(member.id())) {
                this.members.remove(member.id());
                MembershipRecord remove = this.membershipTable.remove(member.id());
                ByteBuffer removeMetadata = this.metadataStore.removeMetadata(member);
                this.aliveEmittedSet.remove(member.id());
                if (remove.isLeaving()) {
                    LOGGER.info("[{}] Member left gracefully: {}", this.localMember, member);
                } else {
                    LOGGER.info("[{}] Member left without notification: {}", this.localMember, member);
                }
                publishEvent(MembershipEvent.createRemoved(member, removeMetadata, System.currentTimeMillis()));
            }
        });
    }

    private void onAliveMemberDetected(MembershipRecord membershipRecord, ByteBuffer byteBuffer, ByteBuffer byteBuffer2) {
        Member member = membershipRecord.member();
        boolean containsKey = this.members.containsKey(member.id());
        long currentTimeMillis = System.currentTimeMillis();
        MembershipEvent membershipEvent = null;
        if (!containsKey) {
            membershipEvent = MembershipEvent.createAdded(member, byteBuffer2, currentTimeMillis);
        } else if (!byteBuffer2.equals(byteBuffer)) {
            membershipEvent = MembershipEvent.createUpdated(member, byteBuffer, byteBuffer2, currentTimeMillis);
        }
        this.members.put(member.id(), member);
        this.membershipTable.put(member.id(), membershipRecord);
        if (membershipEvent != null) {
            publishEvent(membershipEvent);
            if (membershipEvent.isAdded()) {
                this.aliveEmittedSet.add(member.id());
            }
        }
    }

    private void cancelSuspicionTimeoutTask(String str) {
        Disposable remove = this.suspicionTimeoutTasks.remove(str);
        if (remove == null || remove.isDisposed()) {
            return;
        }
        LOGGER.debug("[{}] Cancelled SuspicionTimeoutTask for {}", this.localMember, str);
        remove.dispose();
    }

    private void scheduleSuspicionTimeoutTask(MembershipRecord membershipRecord) {
        long suspicionTimeout = ClusterMath.suspicionTimeout(this.membershipConfig.suspicionMult(), this.membershipTable.size(), this.failureDetectorConfig.pingInterval());
        this.suspicionTimeoutTasks.computeIfAbsent(membershipRecord.member().id(), str -> {
            LOGGER.debug("[{}] Scheduled SuspicionTimeoutTask for {}, suspicionTimeout: {}", new Object[]{this.localMember, str, Long.valueOf(suspicionTimeout)});
            return this.scheduler.schedule(() -> {
                onSuspicionTimeout(str);
            }, suspicionTimeout, TimeUnit.MILLISECONDS);
        });
    }

    private void onSuspicionTimeout(String str) {
        this.suspicionTimeoutTasks.remove(str);
        MembershipRecord membershipRecord = this.membershipTable.get(str);
        if (membershipRecord != null) {
            LOGGER.debug("[{}] Declare SUSPECTED member {} as DEAD by timeout", this.localMember, membershipRecord);
            updateMembership(new MembershipRecord(membershipRecord.member(), MemberStatus.DEAD, membershipRecord.incarnation()), MembershipUpdateReason.SUSPICION_TIMEOUT).subscribe((Consumer) null, th -> {
                LOGGER.error("[{}][onSuspicionTimeout][updateMembership][error] cause:", this.localMember, th);
            });
        }
    }

    private void spreadMembershipGossipUnlessGossiped(MembershipRecord membershipRecord, MembershipUpdateReason membershipUpdateReason) {
        if (membershipUpdateReason == MembershipUpdateReason.MEMBERSHIP_GOSSIP || membershipUpdateReason == MembershipUpdateReason.INITIAL_SYNC) {
            return;
        }
        spreadMembershipGossip(membershipRecord).subscribe((Consumer) null, th -> {
        });
    }

    private Mono<Void> spreadMembershipGossip(MembershipRecord membershipRecord) {
        return Mono.defer(() -> {
            return this.gossipProtocol.spread(Message.builder().sender(this.localMember).data(membershipRecord).qualifier(MEMBERSHIP_GOSSIP).build()).doOnSubscribe(subscription -> {
                LOGGER.debug("[{}] Send membership with gossip", this.localMember);
            }).doOnError(th -> {
                LOGGER.debug("[{}] Failed to send membership with gossip, cause: {}", this.localMember, th.toString());
            }).then();
        });
    }

    private static Mono<Void> send(Transport transport, List<Address> list, Message message) {
        AtomicInteger atomicInteger = new AtomicInteger();
        return Mono.defer(() -> {
            return transport.send((Address) list.get(atomicInteger.get()), message);
        }).doOnError(th -> {
            atomicInteger.incrementAndGet();
        }).retry(list.size() - 1);
    }
}
