package io.camunda.zeebe.broker.partitioning.topology;

import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.Member;
import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.PartitionListener;
import io.camunda.zeebe.engine.state.QueryService;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.util.LogUtil;
import io.camunda.zeebe.util.health.HealthStatus;
import java.util.ArrayList;
import java.util.List;
import org.agrona.collections.Int2ObjectHashMap;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/broker/partitioning/topology/TopologyManagerImpl.class */
public final class TopologyManagerImpl extends Actor implements TopologyManager, ClusterMembershipEventListener, PartitionListener {
    private static final Logger LOG = Loggers.CLUSTERING_LOGGER;
    private final ClusterMembershipService membershipService;
    private final BrokerInfo localBroker;
    private final Int2ObjectHashMap<BrokerInfo> partitionLeaders = new Int2ObjectHashMap<>();
    private final List<TopologyPartitionListener> topologyPartitionListeners = new ArrayList();
    private final String actorName = "TopologyManager";

    /* renamed from: io.camunda.zeebe.broker.partitioning.topology.TopologyManagerImpl$1, reason: invalid class name */
    /* loaded from: input_file:io/camunda/zeebe/broker/partitioning/topology/TopologyManagerImpl$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type = new int[ClusterMembershipEvent.Type.values().length];

        static {
            try {
                $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[ClusterMembershipEvent.Type.METADATA_CHANGED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[ClusterMembershipEvent.Type.MEMBER_ADDED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[ClusterMembershipEvent.Type.MEMBER_REMOVED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[ClusterMembershipEvent.Type.REACHABILITY_CHANGED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public TopologyManagerImpl(ClusterMembershipService clusterMembershipService, BrokerInfo brokerInfo) {
        this.membershipService = clusterMembershipService;
        this.localBroker = brokerInfo;
    }

    @Override // io.camunda.zeebe.broker.PartitionListener
    public ActorFuture<Void> onBecomingFollower(int i, long j) {
        return setFollower(i);
    }

    @Override // io.camunda.zeebe.broker.PartitionListener
    public ActorFuture<Void> onBecomingLeader(int i, long j, LogStream logStream, QueryService queryService) {
        return setLeader(j, i);
    }

    @Override // io.camunda.zeebe.broker.PartitionListener
    public ActorFuture<Void> onBecomingInactive(int i, long j) {
        return setInactive(i);
    }

    public String getName() {
        return this.actorName;
    }

    protected void onActorStarted() {
        publishTopologyChanges();
        this.membershipService.addListener(this);
        this.membershipService.getMembers().forEach(member -> {
            event(new ClusterMembershipEvent(ClusterMembershipEvent.Type.MEMBER_ADDED, member));
        });
    }

    public ActorFuture<Void> setLeader(long j, int i) {
        return this.actor.call(() -> {
            this.partitionLeaders.put(i, this.localBroker);
            this.localBroker.setLeaderForPartition(i, j);
            publishTopologyChanges();
            notifyPartitionLeaderUpdated(i, this.localBroker);
        });
    }

    public ActorFuture<Void> setFollower(int i) {
        return this.actor.call(() -> {
            removeIfLeader(this.localBroker, Integer.valueOf(i));
            this.localBroker.setFollowerForPartition(i);
            publishTopologyChanges();
        });
    }

    public ActorFuture<Void> setInactive(int i) {
        return this.actor.call(() -> {
            removeIfLeader(this.localBroker, Integer.valueOf(i));
            this.localBroker.setInactiveForPartition(i);
            publishTopologyChanges();
        });
    }

    public void event(ClusterMembershipEvent clusterMembershipEvent) {
        BrokerInfo readBrokerInfo = readBrokerInfo((Member) clusterMembershipEvent.subject());
        if (readBrokerInfo == null || readBrokerInfo.getNodeId() == this.localBroker.getNodeId()) {
            return;
        }
        this.actor.run(() -> {
            switch (AnonymousClass1.$SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[clusterMembershipEvent.type().ordinal()]) {
                case 1:
                case 2:
                    onMetadataChanged(readBrokerInfo);
                    return;
                case 3:
                    onMemberRemoved(readBrokerInfo);
                    return;
                case 4:
                default:
                    LOG.debug("Received {} from member {}, was not handled.", clusterMembershipEvent.type(), Integer.valueOf(readBrokerInfo.getNodeId()));
                    return;
            }
        });
    }

    private void onMemberRemoved(BrokerInfo brokerInfo) {
        LOG.debug("Received member removed {} ", brokerInfo);
        brokerInfo.consumePartitions(i -> {
            removeIfLeader(brokerInfo, Integer.valueOf(i));
        }, (num, j) -> {
        }, i2 -> {
        }, i3 -> {
        });
    }

    private void removeIfLeader(BrokerInfo brokerInfo, Integer num) {
        BrokerInfo brokerInfo2 = (BrokerInfo) this.partitionLeaders.get(num);
        if (brokerInfo2 == null || brokerInfo2.getNodeId() != brokerInfo.getNodeId()) {
            return;
        }
        this.partitionLeaders.remove(num);
    }

    private void onMetadataChanged(BrokerInfo brokerInfo) {
        LOG.debug("Received metadata change for {}, partitions {} terms {}", new Object[]{Integer.valueOf(brokerInfo.getNodeId()), brokerInfo.getPartitionRoles(), brokerInfo.getPartitionLeaderTerms()});
        brokerInfo.consumePartitions((num, j) -> {
            if (updatePartitionLeader(brokerInfo, num.intValue(), j)) {
                notifyPartitionLeaderUpdated(num.intValue(), brokerInfo);
            }
        }, i -> {
            removeIfLeader(brokerInfo, Integer.valueOf(i));
        }, i2 -> {
            removeIfLeader(brokerInfo, Integer.valueOf(i2));
        });
    }

    private boolean updatePartitionLeader(BrokerInfo brokerInfo, int i, long j) {
        BrokerInfo brokerInfo2 = (BrokerInfo) this.partitionLeaders.get(i);
        if (brokerInfo2 != null) {
            Long l = (Long) brokerInfo2.getPartitionLeaderTerms().get(Integer.valueOf(i));
            if (l == null) {
                LOG.error("Could not update new leader for partition {} at term {}. Expected to have a non-null value for current leader term, but found null", Integer.valueOf(i), Long.valueOf(j));
                return false;
            }
            if (l.longValue() >= j) {
                return false;
            }
        }
        this.partitionLeaders.put(i, brokerInfo);
        return true;
    }

    private BrokerInfo readBrokerInfo(Member member) {
        BrokerInfo fromProperties = BrokerInfo.fromProperties(member.properties());
        if (fromProperties == null || isStaticConfigValid(fromProperties)) {
            return fromProperties;
        }
        LOG.error("Static configuration of node {} differs from local node {}: NodeId: 0 <= {} < {}, ClusterSize: {} == {}, PartitionsCount: {} == {}, ReplicationFactor: {} == {}.", new Object[]{member.id(), this.membershipService.getLocalMember().id(), Integer.valueOf(fromProperties.getNodeId()), Integer.valueOf(this.localBroker.getClusterSize()), Integer.valueOf(fromProperties.getClusterSize()), Integer.valueOf(this.localBroker.getClusterSize()), Integer.valueOf(fromProperties.getPartitionsCount()), Integer.valueOf(this.localBroker.getPartitionsCount()), Integer.valueOf(fromProperties.getReplicationFactor()), Integer.valueOf(this.localBroker.getReplicationFactor())});
        return null;
    }

    private boolean isStaticConfigValid(BrokerInfo brokerInfo) {
        return brokerInfo.getNodeId() >= 0 && brokerInfo.getNodeId() < this.localBroker.getClusterSize() && this.localBroker.getClusterSize() == brokerInfo.getClusterSize() && this.localBroker.getPartitionsCount() == brokerInfo.getPartitionsCount() && this.localBroker.getReplicationFactor() == brokerInfo.getReplicationFactor();
    }

    private void publishTopologyChanges() {
        this.localBroker.writeIntoProperties(this.membershipService.getLocalMember().properties());
    }

    @Override // io.camunda.zeebe.broker.partitioning.topology.TopologyManager
    public void removeTopologyPartitionListener(TopologyPartitionListener topologyPartitionListener) {
        this.actor.run(() -> {
            this.topologyPartitionListeners.remove(topologyPartitionListener);
        });
    }

    @Override // io.camunda.zeebe.broker.partitioning.topology.TopologyManager
    public void addTopologyPartitionListener(TopologyPartitionListener topologyPartitionListener) {
        this.actor.run(() -> {
            this.topologyPartitionListeners.add(topologyPartitionListener);
            this.partitionLeaders.forEach((num, brokerInfo) -> {
                LogUtil.catchAndLog(LOG, () -> {
                    topologyPartitionListener.onPartitionLeaderUpdated(num.intValue(), brokerInfo);
                });
            });
        });
    }

    private void notifyPartitionLeaderUpdated(int i, BrokerInfo brokerInfo) {
        for (TopologyPartitionListener topologyPartitionListener : this.topologyPartitionListeners) {
            LogUtil.catchAndLog(LOG, () -> {
                topologyPartitionListener.onPartitionLeaderUpdated(i, brokerInfo);
            });
        }
    }

    public void onHealthChanged(int i, HealthStatus healthStatus) {
        this.actor.run(() -> {
            if (healthStatus == HealthStatus.HEALTHY) {
                this.localBroker.setPartitionHealthy(Integer.valueOf(i));
            } else if (healthStatus == HealthStatus.UNHEALTHY) {
                this.localBroker.setPartitionUnhealthy(Integer.valueOf(i));
            } else if (healthStatus == HealthStatus.DEAD) {
                this.localBroker.setPartitionDead(Integer.valueOf(i));
            }
            publishTopologyChanges();
        });
    }
}
