package io.scalecube.cluster.gossip;

import io.scalecube.cluster.ClusterMath;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportWrapper;
import io.scalecube.reactor.RetryNonSerializedEmitFailureHandler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/scalecube/cluster/gossip/GossipProtocolImpl.class */
public final class GossipProtocolImpl implements GossipProtocol {
    private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocol.class);
    public static final String GOSSIP_REQ = "sc/gossip/req";
    private final Member localMember;
    private final Transport transport;
    private final GossipConfig config;
    private long currentPeriod = 0;
    private long gossipCounter = 0;
    private final Map<String, SequenceIdCollector> sequenceIdCollectors = new HashMap();
    private final Map<String, GossipState> gossips = new HashMap();
    private final Map<String, MonoSink<String>> futures = new HashMap();
    private final List<Member> remoteMembers = new ArrayList();
    private int remoteMembersIndex = -1;
    private final Disposable.Composite actionsDisposables = Disposables.composite();
    private final Sinks.Many<Message> sink = Sinks.many().multicast().directBestEffort();
    private final Scheduler scheduler;

    public GossipProtocolImpl(Member member, Transport transport, Flux<MembershipEvent> flux, GossipConfig gossipConfig, Scheduler scheduler) {
        this.transport = (Transport) Objects.requireNonNull(transport);
        this.config = (GossipConfig) Objects.requireNonNull(gossipConfig);
        this.localMember = (Member) Objects.requireNonNull(member);
        this.scheduler = (Scheduler) Objects.requireNonNull(scheduler);
        this.actionsDisposables.addAll(Arrays.asList(flux.publishOn(scheduler).subscribe(this::onMembershipEvent, th -> {
            LOGGER.error("[{}][onMembershipEvent][error] cause:", member, th);
        }), transport.listen().publishOn(scheduler).filter(this::isGossipRequest).subscribe(this::onGossipRequest, th2 -> {
            LOGGER.error("[{}][onGossipRequest][error] cause:", member, th2);
        })));
    }

    @Override // io.scalecube.cluster.gossip.GossipProtocol
    public void start() {
        this.actionsDisposables.add(this.scheduler.schedulePeriodically(this::doSpreadGossip, this.config.gossipInterval(), this.config.gossipInterval(), TimeUnit.MILLISECONDS));
    }

    @Override // io.scalecube.cluster.gossip.GossipProtocol
    public void stop() {
        this.actionsDisposables.dispose();
        this.sink.emitComplete(RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED);
    }

    @Override // io.scalecube.cluster.gossip.GossipProtocol
    public Mono<String> spread(Message message) {
        return Mono.just(message).subscribeOn(this.scheduler).flatMap(message2 -> {
            return Mono.create(monoSink -> {
                this.futures.put(createAndPutGossip(message2), monoSink);
            });
        });
    }

    @Override // io.scalecube.cluster.gossip.GossipProtocol
    public Flux<Message> listen() {
        return this.sink.asFlux().onBackpressureBuffer();
    }

    /*  JADX ERROR: Failed to decode insn: 0x0005: MOVE_MULTI, method: io.scalecube.cluster.gossip.GossipProtocolImpl.doSpreadGossip():void
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    private void doSpreadGossip() {
        /*
            Method dump skipped, instructions count: 288
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.scalecube.cluster.gossip.GossipProtocolImpl.doSpreadGossip():void");
    }

    private String createAndPutGossip(Message message) {
        long j = this.currentPeriod;
        Gossip createGossip = createGossip(message);
        this.gossips.put(createGossip.gossipId(), new GossipState(createGossip, j));
        ensureSequence(this.localMember.id()).add(createGossip.sequenceId());
        return createGossip.gossipId();
    }

    private void onGossipRequest(Message message) {
        long j = this.currentPeriod;
        GossipRequest gossipRequest = (GossipRequest) message.data();
        for (Gossip gossip : gossipRequest.gossips()) {
            GossipState gossipState = this.gossips.get(gossip.gossipId());
            if (ensureSequence(gossip.gossiperId()).add(gossip.sequenceId()) && gossipState == null) {
                gossipState = new GossipState(gossip, j);
                this.gossips.put(gossip.gossipId(), gossipState);
                this.sink.emitNext(gossip.message(), RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED);
            }
            if (gossipState != null) {
                gossipState.addToInfected(gossipRequest.from());
            }
        }
    }

    private void checkGossipSegmentation() {
        int gossipSegmentationThreshold = this.config.gossipSegmentationThreshold();
        for (Map.Entry<String, SequenceIdCollector> entry : this.sequenceIdCollectors.entrySet()) {
            SequenceIdCollector value = entry.getValue();
            if (value.size() > gossipSegmentationThreshold) {
                LOGGER.warn("[{}][{}] Too many missed gossip messages from original gossiper: '{}', current node({}) was SUSPECTED much for a long time or connection problem", new Object[]{this.localMember, Long.valueOf(this.currentPeriod), entry.getKey(), this.localMember});
                value.clear();
            }
        }
    }

    private void onMembershipEvent(MembershipEvent membershipEvent) {
        Member member = membershipEvent.member();
        if (membershipEvent.isRemoved()) {
            boolean remove = this.remoteMembers.remove(member);
            this.sequenceIdCollectors.remove(member.id());
            if (remove) {
                LOGGER.debug("[{}][{}] Removed {} from remoteMembers list (size={})", new Object[]{this.localMember, Long.valueOf(this.currentPeriod), member, Integer.valueOf(this.remoteMembers.size())});
            }
        }
        if (membershipEvent.isAdded()) {
            this.remoteMembers.add(member);
            LOGGER.debug("[{}][{}] Added {} to remoteMembers list (size={})", new Object[]{this.localMember, Long.valueOf(this.currentPeriod), member, Integer.valueOf(this.remoteMembers.size())});
        }
    }

    private boolean isGossipRequest(Message message) {
        return GOSSIP_REQ.equals(message.qualifier());
    }

    private Gossip createGossip(Message message) {
        String id = this.localMember.id();
        long j = this.gossipCounter;
        this.gossipCounter = j + 1;
        return new Gossip(id, message, j);
    }

    private SequenceIdCollector ensureSequence(String str) {
        return this.sequenceIdCollectors.computeIfAbsent(str, str2 -> {
            return new SequenceIdCollector();
        });
    }

    private void spreadGossipsTo(long j, Member member) {
        List<Gossip> selectGossipsToSend = selectGossipsToSend(j, member);
        if (selectGossipsToSend.isEmpty()) {
            return;
        }
        List addresses = member.addresses();
        selectGossipsToSend.stream().map(this::buildGossipRequestMessage).forEach(message -> {
            TransportWrapper.send(this.transport, addresses, message).subscribe((Consumer) null, th -> {
                LOGGER.debug("[{}][{}] Failed to send GossipReq({}) to {}, cause: {}", new Object[]{this.localMember, Long.valueOf(j), message, addresses, th.toString()});
            });
        });
    }

    private List<Gossip> selectGossipsToSend(long j, Member member) {
        int gossipPeriodsToSpread = ClusterMath.gossipPeriodsToSpread(this.config.gossipRepeatMult(), this.remoteMembers.size() + 1);
        return (List) this.gossips.values().stream().filter(gossipState -> {
            return gossipState.infectionPeriod() + ((long) gossipPeriodsToSpread) >= j;
        }).filter(gossipState2 -> {
            return !gossipState2.isInfected(member.id());
        }).map((v0) -> {
            return v0.gossip();
        }).collect(Collectors.toList());
    }

    private List<Member> selectGossipMembers() {
        int gossipFanout = this.config.gossipFanout();
        if (this.remoteMembers.size() < gossipFanout) {
            return this.remoteMembers;
        }
        if (this.remoteMembersIndex < 0 || this.remoteMembersIndex + gossipFanout > this.remoteMembers.size()) {
            Collections.shuffle(this.remoteMembers);
            this.remoteMembersIndex = 0;
        }
        List<Member> singletonList = gossipFanout == 1 ? Collections.singletonList(this.remoteMembers.get(this.remoteMembersIndex)) : this.remoteMembers.subList(this.remoteMembersIndex, this.remoteMembersIndex + gossipFanout);
        this.remoteMembersIndex += gossipFanout;
        return singletonList;
    }

    private Message buildGossipRequestMessage(Gossip gossip) {
        return Message.withData(new GossipRequest(gossip, this.localMember.id())).qualifier(GOSSIP_REQ).build();
    }

    private Set<String> getGossipsToRemove(long j) {
        int gossipPeriodsToSweep = ClusterMath.gossipPeriodsToSweep(this.config.gossipRepeatMult(), this.remoteMembers.size() + 1);
        return (Set) this.gossips.values().stream().filter(gossipState -> {
            return j > gossipState.infectionPeriod() + ((long) gossipPeriodsToSweep);
        }).map(gossipState2 -> {
            return gossipState2.gossip().gossipId();
        }).collect(Collectors.toSet());
    }

    private Set<String> getGossipsThatMostLikelyDisseminated(long j) {
        int gossipPeriodsToSpread = ClusterMath.gossipPeriodsToSpread(this.config.gossipRepeatMult(), this.remoteMembers.size() + 1);
        return (Set) this.gossips.values().stream().filter(gossipState -> {
            return j > gossipState.infectionPeriod() + ((long) gossipPeriodsToSpread);
        }).map(gossipState2 -> {
            return gossipState2.gossip().gossipId();
        }).collect(Collectors.toSet());
    }

    Transport getTransport() {
        return this.transport;
    }

    Member getMember() {
        return this.localMember;
    }
}
