package io.scalecube.cluster.membership;

import io.scalecube.cluster.ClusterMath;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.fdetector.FailureDetector;
import io.scalecube.cluster.fdetector.FailureDetectorEvent;
import io.scalecube.cluster.gossip.GossipProtocol;
import io.scalecube.transport.Address;
import io.scalecube.transport.Message;
import io.scalecube.transport.Transport;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:io/scalecube/cluster/membership/MembershipProtocolImpl.class */
public final class MembershipProtocolImpl implements MembershipProtocol {
    private static final Logger LOGGER = LoggerFactory.getLogger(MembershipProtocolImpl.class);
    public static final String SYNC = "sc/membership/sync";
    public static final String SYNC_ACK = "sc/membership/syncAck";
    public static final String MEMBERSHIP_GOSSIP = "sc/membership/gossip";
    private final AtomicReference<Member> memberRef;
    private final Transport transport;
    private final MembershipConfig config;
    private final List<Address> seedMembers;
    private FailureDetector failureDetector;
    private GossipProtocol gossipProtocol;
    private final Scheduler scheduler;
    private Disposable syncTask;
    private final Map<String, MembershipRecord> membershipTable = new HashMap();
    private final FluxProcessor<MembershipEvent, MembershipEvent> subject = DirectProcessor.create().serialize();
    private final FluxSink<MembershipEvent> sink = this.subject.sink();
    private final Disposable.Composite actionsDisposables = Disposables.composite();
    private final Map<String, Disposable> suspicionTimeoutTasks = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/scalecube/cluster/membership/MembershipProtocolImpl$MembershipUpdateReason.class */
    public enum MembershipUpdateReason {
        FAILURE_DETECTOR_EVENT,
        MEMBERSHIP_GOSSIP,
        SYNC,
        INITIAL_SYNC,
        SUSPICION_TIMEOUT
    }

    public MembershipProtocolImpl(Transport transport, MembershipConfig membershipConfig) {
        this.transport = transport;
        this.config = membershipConfig;
        Address memberAddress = memberAddress(transport, membershipConfig);
        this.memberRef = new AtomicReference<>(new Member(IdGenerator.generateId(), memberAddress, membershipConfig.getMetadata()));
        this.scheduler = Schedulers.newSingle("sc-membership-" + Integer.toString(memberAddress.port()), true);
        this.seedMembers = cleanUpSeedMembers(membershipConfig.getSeedMembers());
    }

    protected static Address memberAddress(Transport transport, MembershipConfig membershipConfig) {
        Address address = transport.address();
        if (membershipConfig.getMemberHost() != null) {
            address = Address.create(membershipConfig.getMemberHost(), membershipConfig.getMemberPort() != null ? membershipConfig.getMemberPort().intValue() : address.port());
        }
        return address;
    }

    private List<Address> cleanUpSeedMembers(Collection<Address> collection) {
        HashSet hashSet = new HashSet(collection);
        hashSet.remove(member().address());
        return Collections.unmodifiableList(new ArrayList(hashSet));
    }

    public void setFailureDetector(FailureDetector failureDetector) {
        this.failureDetector = failureDetector;
    }

    public void setGossipProtocol(GossipProtocol gossipProtocol) {
        this.gossipProtocol = gossipProtocol;
    }

    FailureDetector getFailureDetector() {
        return this.failureDetector;
    }

    GossipProtocol getGossipProtocol() {
        return this.gossipProtocol;
    }

    Transport getTransport() {
        return this.transport;
    }

    List<MembershipRecord> getMembershipRecords() {
        return Collections.unmodifiableList(new ArrayList(this.membershipTable.values()));
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public Flux<MembershipEvent> listen() {
        return Flux.from(this.subject);
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public Member member() {
        return this.memberRef.get();
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public void updateMetadata(Map<String, String> map) {
        this.scheduler.schedule(() -> {
            onUpdateMetadata(map);
        });
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public void updateMetadataProperty(String str, String str2) {
        this.scheduler.schedule(() -> {
            onUpdateMetadataProperty(str, str2);
        });
    }

    public CompletableFuture<String> leave() {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        this.scheduler.schedule(() -> {
            onLeave().whenComplete((str, th) -> {
                completableFuture.complete(str);
            });
        });
        return completableFuture;
    }

    public CompletableFuture<Void> start() {
        Member member = this.memberRef.get();
        this.membershipTable.put(member.id(), new MembershipRecord(member, MemberStatus.ALIVE, 0));
        this.actionsDisposables.addAll(Arrays.asList(this.transport.listen().publishOn(this.scheduler).filter(message -> {
            return SYNC.equals(message.qualifier());
        }).filter(this::checkSyncGroup).subscribe(this::onSync, this::onError), this.transport.listen().publishOn(this.scheduler).filter(message2 -> {
            return SYNC_ACK.equals(message2.qualifier());
        }).filter(message3 -> {
            return message3.correlationId() == null;
        }).filter(this::checkSyncGroup).subscribe(this::onSyncAck, this::onError), this.failureDetector.listen().publishOn(this.scheduler).subscribe(this::onFailureDetectorEvent, this::onError), this.gossipProtocol.listen().publishOn(this.scheduler).filter(message4 -> {
            return MEMBERSHIP_GOSSIP.equals(message4.qualifier());
        }).subscribe(this::onMembershipGossip, this::onError)));
        return doInitialSync();
    }

    private void onError(Throwable th) {
        LOGGER.error("Received unexpected error: ", th);
    }

    public void stop() {
        this.actionsDisposables.dispose();
        if (this.syncTask != null && !this.syncTask.isDisposed()) {
            this.syncTask.dispose();
        }
        Iterator<String> it = this.suspicionTimeoutTasks.keySet().iterator();
        while (it.hasNext()) {
            Disposable disposable = this.suspicionTimeoutTasks.get(it.next());
            if (disposable != null && !disposable.isDisposed()) {
                disposable.dispose();
            }
        }
        this.suspicionTimeoutTasks.clear();
        this.scheduler.dispose();
        this.sink.complete();
    }

    private CompletableFuture<Void> doInitialSync() {
        LOGGER.debug("Making initial Sync to all seed members: {}", this.seedMembers);
        if (this.seedMembers.isEmpty()) {
            schedulePeriodicSync();
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        String id = this.memberRef.get().id();
        this.transport.listen().publishOn(this.scheduler).filter(message -> {
            return SYNC_ACK.equals(message.qualifier());
        }).filter(message2 -> {
            return id.equals(message2.correlationId());
        }).filter(this::checkSyncGroup).take(1L).timeout(Duration.ofMillis(this.config.getSyncTimeout()), this.scheduler).subscribe(message3 -> {
            SyncData syncData = (SyncData) message3.data();
            LOGGER.info("Joined cluster '{}': {}", syncData.getSyncGroup(), syncData.getMembership());
            onSyncAck(message3, true);
            schedulePeriodicSync();
            completableFuture.complete(null);
        }, th -> {
            LOGGER.info("Timeout getting initial SyncAck from seed members: {}", this.seedMembers);
            schedulePeriodicSync();
            completableFuture.complete(null);
        });
        Message prepareSyncDataMsg = prepareSyncDataMsg(SYNC, id);
        this.seedMembers.forEach(address -> {
            this.transport.send(address, prepareSyncDataMsg);
        });
        return completableFuture;
    }

    private void doSync() {
        try {
            Address selectSyncAddress = selectSyncAddress();
            if (selectSyncAddress == null) {
                return;
            }
            Message prepareSyncDataMsg = prepareSyncDataMsg(SYNC, null);
            this.transport.send(selectSyncAddress, prepareSyncDataMsg);
            LOGGER.debug("Send Sync to {}: {}", selectSyncAddress, prepareSyncDataMsg);
        } catch (Exception e) {
            LOGGER.error("Unhandled exception: {}", e, e);
        }
    }

    private void onUpdateMetadataProperty(String str, String str2) {
        HashMap hashMap = new HashMap(this.memberRef.get().metadata());
        hashMap.put(str, str2);
        onUpdateMetadata(hashMap);
    }

    private void onUpdateMetadata(Map<String, String> map) {
        Member member = this.memberRef.get();
        String id = member.id();
        Member member2 = new Member(id, member.address(), map);
        this.memberRef.set(member2);
        MembershipRecord membershipRecord = new MembershipRecord(member2, MemberStatus.ALIVE, this.membershipTable.get(id).incarnation() + 1);
        this.membershipTable.put(id, membershipRecord);
        this.sink.next(MembershipEvent.createUpdated(member, member2));
        spreadMembershipGossip(membershipRecord);
    }

    private void onSyncAck(Message message) {
        onSyncAck(message, false);
    }

    private void onSyncAck(Message message, boolean z) {
        LOGGER.debug("Received SyncAck: {}", message);
        syncMembership((SyncData) message.data(), z);
    }

    private void onSync(Message message) {
        LOGGER.debug("Received Sync: {}", message);
        syncMembership((SyncData) message.data(), false);
        this.transport.send(message.sender(), prepareSyncDataMsg(SYNC_ACK, message.correlationId()));
    }

    private void onFailureDetectorEvent(FailureDetectorEvent failureDetectorEvent) {
        MembershipRecord membershipRecord = this.membershipTable.get(failureDetectorEvent.member().id());
        if (membershipRecord == null || membershipRecord.status() == failureDetectorEvent.status()) {
            return;
        }
        LOGGER.debug("Received status change on failure detector event: {}", failureDetectorEvent);
        if (failureDetectorEvent.status() != MemberStatus.ALIVE) {
            updateMembership(new MembershipRecord(membershipRecord.member(), failureDetectorEvent.status(), membershipRecord.incarnation()), MembershipUpdateReason.FAILURE_DETECTOR_EVENT);
        } else {
            this.transport.send(failureDetectorEvent.member().address(), prepareSyncDataMsg(SYNC, null));
        }
    }

    private void onMembershipGossip(Message message) {
        MembershipRecord membershipRecord = (MembershipRecord) message.data();
        LOGGER.debug("Received membership gossip: {}", membershipRecord);
        updateMembership(membershipRecord, MembershipUpdateReason.MEMBERSHIP_GOSSIP);
    }

    private Address selectSyncAddress() {
        if (this.seedMembers.isEmpty()) {
            return null;
        }
        return this.seedMembers.get(ThreadLocalRandom.current().nextInt(this.seedMembers.size()));
    }

    private boolean checkSyncGroup(Message message) {
        return this.config.getSyncGroup().equals(((SyncData) message.data()).getSyncGroup());
    }

    private void schedulePeriodicSync() {
        int syncInterval = this.config.getSyncInterval();
        this.syncTask = this.scheduler.schedulePeriodically(this::doSync, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
    }

    private Message prepareSyncDataMsg(String str, String str2) {
        return Message.withData(new SyncData(new ArrayList(this.membershipTable.values()), this.config.getSyncGroup())).qualifier(str).correlationId(str2).build();
    }

    private void syncMembership(SyncData syncData, boolean z) {
        for (MembershipRecord membershipRecord : syncData.getMembership()) {
            if (!membershipRecord.equals(this.membershipTable.get(membershipRecord.id()))) {
                updateMembership(membershipRecord, z ? MembershipUpdateReason.INITIAL_SYNC : MembershipUpdateReason.SYNC);
            }
        }
    }

    private void updateMembership(MembershipRecord membershipRecord, MembershipUpdateReason membershipUpdateReason) {
        Objects.requireNonNull(membershipRecord, "Membership record can't be null");
        MembershipRecord membershipRecord2 = this.membershipTable.get(membershipRecord.id());
        if (membershipRecord.isOverrides(membershipRecord2)) {
            Member member = this.memberRef.get();
            if (membershipRecord.member().id().equals(member.id())) {
                MembershipRecord membershipRecord3 = new MembershipRecord(member, membershipRecord2.status(), Math.max(membershipRecord2.incarnation(), membershipRecord.incarnation()) + 1);
                this.membershipTable.put(member.id(), membershipRecord3);
                LOGGER.debug("Local membership record r0={}, but received r1={}, spread r2={}", new Object[]{membershipRecord2, membershipRecord, membershipRecord3});
                spreadMembershipGossip(membershipRecord3);
                return;
            }
            if (membershipRecord.isDead()) {
                this.membershipTable.remove(membershipRecord.id());
            } else {
                this.membershipTable.put(membershipRecord.id(), membershipRecord);
            }
            if (membershipRecord.isSuspect()) {
                scheduleSuspicionTimeoutTask(membershipRecord);
            } else {
                cancelSuspicionTimeoutTask(membershipRecord.id());
            }
            if (membershipRecord.isDead()) {
                this.sink.next(MembershipEvent.createRemoved(membershipRecord.member()));
            } else if (membershipRecord2 == null && membershipRecord.isAlive()) {
                this.sink.next(MembershipEvent.createAdded(membershipRecord.member()));
            } else if (membershipRecord2 != null && !membershipRecord2.member().equals(membershipRecord.member())) {
                this.sink.next(MembershipEvent.createUpdated(membershipRecord2.member(), membershipRecord.member()));
            }
            if (membershipUpdateReason == MembershipUpdateReason.MEMBERSHIP_GOSSIP || membershipUpdateReason == MembershipUpdateReason.INITIAL_SYNC) {
                return;
            }
            spreadMembershipGossip(membershipRecord);
        }
    }

    private void cancelSuspicionTimeoutTask(String str) {
        Disposable remove = this.suspicionTimeoutTasks.remove(str);
        if (remove == null || remove.isDisposed()) {
            return;
        }
        remove.dispose();
    }

    private void scheduleSuspicionTimeoutTask(MembershipRecord membershipRecord) {
        long suspicionTimeout = ClusterMath.suspicionTimeout(this.config.getSuspicionMult(), this.membershipTable.size(), this.config.getPingInterval());
        this.suspicionTimeoutTasks.computeIfAbsent(membershipRecord.id(), str -> {
            return this.scheduler.schedule(() -> {
                onSuspicionTimeout(str);
            }, suspicionTimeout, TimeUnit.MILLISECONDS);
        });
    }

    private void onSuspicionTimeout(String str) {
        this.suspicionTimeoutTasks.remove(str);
        MembershipRecord membershipRecord = this.membershipTable.get(str);
        if (membershipRecord != null) {
            LOGGER.debug("Declare SUSPECTED member as DEAD by timeout: {}", membershipRecord);
            updateMembership(new MembershipRecord(membershipRecord.member(), MemberStatus.DEAD, membershipRecord.incarnation()), MembershipUpdateReason.SUSPICION_TIMEOUT);
        }
    }

    private CompletableFuture<String> onLeave() {
        String id = this.memberRef.get().id();
        MembershipRecord membershipRecord = new MembershipRecord(member(), MemberStatus.DEAD, this.membershipTable.get(id).incarnation() + 1);
        this.membershipTable.put(id, membershipRecord);
        return spreadMembershipGossip(membershipRecord);
    }

    private CompletableFuture<String> spreadMembershipGossip(MembershipRecord membershipRecord) {
        return this.gossipProtocol.spread(Message.withData(membershipRecord).qualifier(MEMBERSHIP_GOSSIP).build());
    }
}
