package io.zeebe.broker.clustering.base.topology;

import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.Member;
import io.atomix.core.Atomix;
import io.zeebe.broker.Loggers;
import io.zeebe.broker.clustering.base.partitions.RaftState;
import io.zeebe.broker.system.configuration.ClusterCfg;
import io.zeebe.protocol.impl.encoding.BrokerInfo;
import io.zeebe.transport.SocketAddress;
import io.zeebe.util.LogUtil;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.future.ActorFuture;
import java.util.ArrayList;
import java.util.List;
import org.agrona.collections.IntHashSet;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/clustering/base/topology/TopologyManagerImpl.class */
public class TopologyManagerImpl extends Actor implements TopologyManager, ClusterMembershipEventListener {
    private static final Logger LOG = Loggers.CLUSTERING_LOGGER;
    private final Topology topology;
    private final Atomix atomix;
    private final BrokerInfo distributionInfo;
    private final List<TopologyMemberListener> topologyMemberListeners = new ArrayList();
    private final List<TopologyPartitionListener> topologyPartitionListeners = new ArrayList();

    /* renamed from: io.zeebe.broker.clustering.base.topology.TopologyManagerImpl$1, reason: invalid class name */
    /* loaded from: input_file:io/zeebe/broker/clustering/base/topology/TopologyManagerImpl$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type = new int[ClusterMembershipEvent.Type.values().length];

        static {
            try {
                $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[ClusterMembershipEvent.Type.METADATA_CHANGED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[ClusterMembershipEvent.Type.MEMBER_ADDED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[ClusterMembershipEvent.Type.MEMBER_REMOVED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public TopologyManagerImpl(Atomix atomix, NodeInfo nodeInfo, ClusterCfg clusterCfg) {
        this.atomix = atomix;
        this.topology = new Topology(nodeInfo, clusterCfg.getClusterSize(), clusterCfg.getPartitionsCount(), clusterCfg.getReplicationFactor());
        this.distributionInfo = new BrokerInfo().setNodeId(nodeInfo.getNodeId()).setPartitionsCount(this.topology.getPartitionsCount()).setClusterSize(this.topology.getClusterSize()).setReplicationFactor(this.topology.getReplicationFactor());
        this.distributionInfo.setCommandApiAddress(nodeInfo.getCommandApiAddress().toString());
        publishTopologyChanges();
    }

    protected void onActorStarted() {
        this.atomix.getMembershipService().addListener(this);
        this.atomix.getMembershipService().getMembers().forEach(member -> {
            event(new ClusterMembershipEvent(ClusterMembershipEvent.Type.MEMBER_ADDED, member));
        });
    }

    public String getName() {
        return "topology";
    }

    public void updateRole(RaftState raftState, int i) {
        this.actor.call(() -> {
            updatePartition(i, this.topology.getLocal(), raftState);
            publishTopologyChanges();
        });
    }

    public void updatePartition(int i, NodeInfo nodeInfo, RaftState raftState) {
        this.topology.updatePartition(i, nodeInfo, raftState);
        notifyPartitionUpdated(i, nodeInfo);
    }

    public void event(ClusterMembershipEvent clusterMembershipEvent) {
        BrokerInfo readBrokerInfo = readBrokerInfo((Member) clusterMembershipEvent.subject());
        LOG.debug("Member {} received event {} with {}", new Object[]{Integer.valueOf(this.topology.getLocal().getNodeId()), clusterMembershipEvent, readBrokerInfo});
        if (readBrokerInfo == null || readBrokerInfo.getNodeId() == this.topology.getLocal().getNodeId()) {
            return;
        }
        this.actor.call(() -> {
            switch (AnonymousClass1.$SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[clusterMembershipEvent.type().ordinal()]) {
                case 1:
                    onMetadataChanged(readBrokerInfo);
                    return;
                case 2:
                    onMemberAdded(readBrokerInfo);
                    onMetadataChanged(readBrokerInfo);
                    return;
                case 3:
                    onMemberRemoved(readBrokerInfo);
                    return;
                default:
                    return;
            }
        });
    }

    private void onMemberRemoved(BrokerInfo brokerInfo) {
        NodeInfo member = this.topology.getMember(brokerInfo.getNodeId());
        if (member != null) {
            this.topology.removeMember(member);
            notifyMemberRemoved(member);
        }
    }

    private void onMemberAdded(BrokerInfo brokerInfo) {
        String commandApiAddress = brokerInfo.getCommandApiAddress();
        if (commandApiAddress == null) {
            LOG.warn("Ignoring broker info from node id {} as no command API address is present", Integer.valueOf(brokerInfo.getNodeId()));
            return;
        }
        NodeInfo nodeInfo = new NodeInfo(brokerInfo.getNodeId(), SocketAddress.from(commandApiAddress));
        if (this.topology.addMember(nodeInfo)) {
            notifyMemberAdded(nodeInfo);
        }
    }

    private void onMetadataChanged(BrokerInfo brokerInfo) {
        NodeInfo member = this.topology.getMember(brokerInfo.getNodeId());
        brokerInfo.consumePartitions(num -> {
            this.topology.updatePartition(num.intValue(), member, RaftState.LEADER);
            notifyPartitionUpdated(num.intValue(), member);
        }, num2 -> {
            this.topology.updatePartition(num2.intValue(), member, RaftState.FOLLOWER);
            notifyPartitionUpdated(num2.intValue(), member);
        });
    }

    private BrokerInfo readBrokerInfo(Member member) {
        BrokerInfo fromProperties = BrokerInfo.fromProperties(member.properties());
        if (fromProperties == null || isStaticConfigValid(fromProperties)) {
            return fromProperties;
        }
        LOG.error("Static configuration of node {} differs from local node {}", member.id(), this.atomix.getMembershipService().getLocalMember().id());
        return null;
    }

    private boolean isStaticConfigValid(BrokerInfo brokerInfo) {
        return brokerInfo.getNodeId() >= 0 && brokerInfo.getNodeId() < this.topology.getClusterSize() && this.topology.getClusterSize() == brokerInfo.getClusterSize() && this.topology.getPartitionsCount() == brokerInfo.getPartitionsCount() && this.topology.getReplicationFactor() == brokerInfo.getReplicationFactor();
    }

    private void publishTopologyChanges() {
        createLocalNodeBrokerInfo().writeIntoProperties(this.atomix.getMembershipService().getLocalMember().properties());
    }

    private BrokerInfo createLocalNodeBrokerInfo() {
        NodeInfo local = this.topology.getLocal();
        this.distributionInfo.clearPartitions();
        IntHashSet.IntIterator it = local.getLeaders().iterator();
        while (it.hasNext()) {
            this.distributionInfo.setLeaderForPartition(((Integer) it.next()).intValue());
        }
        IntHashSet.IntIterator it2 = local.getFollowers().iterator();
        while (it2.hasNext()) {
            this.distributionInfo.setFollowerForPartition(((Integer) it2.next()).intValue());
        }
        return this.distributionInfo;
    }

    public ActorFuture<Void> close() {
        return this.actor.close();
    }

    @Override // io.zeebe.broker.clustering.base.topology.TopologyManager
    public void addTopologyMemberListener(TopologyMemberListener topologyMemberListener) {
        this.actor.run(() -> {
            this.topologyMemberListeners.add(topologyMemberListener);
            this.topology.getMembers().forEach(nodeInfo -> {
                LogUtil.catchAndLog(LOG, () -> {
                    topologyMemberListener.onMemberAdded(nodeInfo, this.topology);
                });
            });
        });
    }

    @Override // io.zeebe.broker.clustering.base.topology.TopologyManager
    public void removeTopologyMemberListener(TopologyMemberListener topologyMemberListener) {
        this.actor.run(() -> {
            this.topologyMemberListeners.remove(topologyMemberListener);
        });
    }

    @Override // io.zeebe.broker.clustering.base.topology.TopologyManager
    public void addTopologyPartitionListener(TopologyPartitionListener topologyPartitionListener) {
        this.actor.run(() -> {
            this.topologyPartitionListeners.add(topologyPartitionListener);
            this.topology.getPartitions().forEach(num -> {
                LogUtil.catchAndLog(LOG, () -> {
                    NodeInfo leader = this.topology.getLeader(num.intValue());
                    if (leader != null) {
                        topologyPartitionListener.onPartitionUpdated(num.intValue(), leader);
                    }
                    List<NodeInfo> followers = this.topology.getFollowers(num.intValue());
                    if (followers == null || followers.isEmpty()) {
                        return;
                    }
                    followers.forEach(nodeInfo -> {
                        topologyPartitionListener.onPartitionUpdated(num.intValue(), nodeInfo);
                    });
                });
            });
        });
    }

    @Override // io.zeebe.broker.clustering.base.topology.TopologyManager
    public void removeTopologyPartitionListener(TopologyPartitionListener topologyPartitionListener) {
        this.actor.run(() -> {
            this.topologyPartitionListeners.remove(topologyPartitionListener);
        });
    }

    private void notifyMemberAdded(NodeInfo nodeInfo) {
        for (TopologyMemberListener topologyMemberListener : this.topologyMemberListeners) {
            LogUtil.catchAndLog(LOG, () -> {
                topologyMemberListener.onMemberAdded(nodeInfo, this.topology);
            });
        }
    }

    private void notifyMemberRemoved(NodeInfo nodeInfo) {
        for (TopologyMemberListener topologyMemberListener : this.topologyMemberListeners) {
            LogUtil.catchAndLog(LOG, () -> {
                topologyMemberListener.onMemberRemoved(nodeInfo, this.topology);
            });
        }
    }

    private void notifyPartitionUpdated(int i, NodeInfo nodeInfo) {
        for (TopologyPartitionListener topologyPartitionListener : this.topologyPartitionListeners) {
            LogUtil.catchAndLog(LOG, () -> {
                topologyPartitionListener.onPartitionUpdated(i, nodeInfo);
            });
        }
    }
}
