package com.github.chen0040.zkcoordinator.nodes;

import com.github.chen0040.zkcoordinator.models.NodeUri;
import com.github.chen0040.zkcoordinator.models.RegistrationCompleted;
import com.github.chen0040.zkcoordinator.models.ZkConfig;
import com.github.chen0040.zkcoordinator.services.BootstrapService;
import com.github.chen0040.zkcoordinator.services.BootstrapServiceImpl;
import com.github.chen0040.zkcoordinator.services.LeaderElectionService;
import com.github.chen0040.zkcoordinator.services.LeaderElectionServiceImpl;
import com.github.chen0040.zkcoordinator.services.RegistrationService;
import com.github.chen0040.zkcoordinator.services.RegistrationServiceImpl;
import com.github.chen0040.zkcoordinator.services.RequestClusterService;
import com.github.chen0040.zkcoordinator.services.RequestClusterServiceImpl;
import com.github.chen0040.zkcoordinator.services.TaskAssignmentService;
import com.github.chen0040.zkcoordinator.services.TaskAssignmentServiceImpl;
import com.github.chen0040.zkcoordinator.services.WorkerClusterService;
import com.github.chen0040.zkcoordinator.services.WorkerClusterServiceImpl;
import com.github.chen0040.zkcoordinator.utils.IpTools;
import java.io.IOException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:com/github/chen0040/zkcoordinator/nodes/MasterNode.class */
public class MasterNode implements Watcher, MasterActor, ZookeeperActor {
    private static final Logger logger = LoggerFactory.getLogger(MasterNode.class);
    private TaskAssignmentService taskAssignmentService;
    private WorkerClusterService workerClusterService;
    private RequestClusterService requestClusterService;
    private RegistrationService registrationService;
    private LeaderElectionService leaderElectionService;
    private BootstrapService bootstrapService;
    private final String zkConnect;
    private final String groupName;
    private final int initialPort;
    private final ZkConfig zkConfig;
    private String masterId = null;
    private int registeredPort = -1;
    private boolean running = false;
    private final String ipAddress = IpTools.getIpAddress();

    public MasterNode(ZkConfig zkConfig) {
        this.zkConfig = zkConfig;
        this.zkConnect = zkConfig.getZkConnect();
        this.groupName = zkConfig.getMasterGroupName();
        this.initialPort = zkConfig.getStartingPort();
    }

    protected void onZkReconected(String str) {
        logger.info("zookeeper reconnected");
    }

    protected void onZkStarted(ZooKeeper zooKeeper) {
        this.bootstrapService = createBootstrapService(zooKeeper);
        this.taskAssignmentService = createTaskAssignmentService(zooKeeper);
        this.workerClusterService = createWorkerClusterService(zooKeeper);
        this.requestClusterService = createRequestClusterService(zooKeeper);
        this.workerClusterService.addWorkerChangeListener(list -> {
            this.taskAssignmentService.reassignAndSet(list);
        });
        this.bootstrapService.bootstrap();
        this.workerClusterService.watchWorkers();
        this.requestClusterService.watchProducers();
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            logger.warn("sleep interrupted", e);
        }
    }

    public boolean isSelf(NodeUri nodeUri) {
        return this.masterId.equals(nodeUri.nodeId());
    }

    protected void onZkGroupJoined(ZooKeeper zooKeeper, RegistrationCompleted registrationCompleted) {
        String serverId = registrationCompleted.getServerId();
        int port = registrationCompleted.getPort();
        if (this.running) {
            if (port != this.registeredPort) {
                logger.error("Fatal error: port on re-registration differs from the original registered port!");
                System.exit(0);
                return;
            }
            return;
        }
        logger.info("join group: {}:{}", this.ipAddress, Integer.valueOf(port));
        this.registeredPort = port;
        this.masterId = this.ipAddress + "-" + port;
        MDC.put("nodeid", this.masterId);
        this.leaderElectionService = createLeaderElectionService(zooKeeper, serverId, port);
        this.leaderElectionService.addLeadershipListener((str, num) -> {
            takeLeadership(this.ipAddress, num.intValue(), this.masterId);
        });
        this.leaderElectionService.addResignListener((str2, num2) -> {
            resignLeadership(this.ipAddress, num2.intValue(), this.masterId);
        });
        this.leaderElectionService.runForLeader();
        startSystem(this.ipAddress, port, this.masterId);
        this.running = true;
    }

    @Override // com.github.chen0040.zkcoordinator.nodes.ZookeeperActor
    public void start() throws IOException {
        this.registrationService = createRegistrationService();
        this.registrationService.onZkStarted(this::onZkStarted);
        this.registrationService.onZkReconnected(this::onZkReconected);
        this.registrationService.addGroupJoinListener(this::onZkGroupJoined);
        this.registrationService.start(this.zkConfig.getSessionTimeout(), this.initialPort);
    }

    @Override // com.github.chen0040.zkcoordinator.nodes.ZookeeperActor
    public void shutdown() throws InterruptedException {
        this.registrationService.stopZk();
        this.running = false;
        stopSystem();
    }

    protected RegistrationService createRegistrationService() {
        return new RegistrationServiceImpl(this, this.zkConnect, this.zkConfig, this.groupName, this.ipAddress);
    }

    protected LeaderElectionService createLeaderElectionService(ZooKeeper zooKeeper, String str, int i) {
        return new LeaderElectionServiceImpl(zooKeeper, str, i, this.zkConfig.getLeaderPath());
    }

    protected BootstrapService createBootstrapService(ZooKeeper zooKeeper) {
        return new BootstrapServiceImpl(zooKeeper, this.zkConfig);
    }

    protected TaskAssignmentService createTaskAssignmentService(ZooKeeper zooKeeper) {
        return new TaskAssignmentServiceImpl(zooKeeper, this.zkConfig);
    }

    protected WorkerClusterService createWorkerClusterService(ZooKeeper zooKeeper) {
        return new WorkerClusterServiceImpl(zooKeeper, this.zkConfig.getWorkerPath(), this.zkConfig.getWorkerSystemName());
    }

    protected RequestClusterService createRequestClusterService(ZooKeeper zooKeeper) {
        return new RequestClusterServiceImpl(zooKeeper, this.zkConfig.getRequestPath(), this.zkConfig.getRequestSystemName());
    }

    @Override // com.github.chen0040.zkcoordinator.nodes.MasterActor
    public void takeLeadership(String str, int i, String str2) {
        logger.info("This instance (id = {}) has become leader at {}:{}", new Object[]{str2, str, Integer.valueOf(i)});
    }

    @Override // com.github.chen0040.zkcoordinator.nodes.MasterActor
    public void resignLeadership(String str, int i, String str2) {
        logger.info("This instance (id = {}) has resigned from leadership at {}:{}", new Object[]{str2, str, Integer.valueOf(i)});
    }

    @Override // com.github.chen0040.zkcoordinator.nodes.SystemActor
    public void startSystem(String str, int i, String str2) {
        logger.info("start system at {}:{} with id = {}", new Object[]{str, Integer.valueOf(i), str2});
    }

    @Override // com.github.chen0040.zkcoordinator.nodes.SystemActor
    public void stopSystem() {
        logger.info("system shutdown");
    }

    public void process(WatchedEvent watchedEvent) {
    }

    public void isTaskAssigned(String str, BiConsumer<String, Boolean> biConsumer) {
        this.taskAssignmentService.isTaskAssigned(str, biConsumer);
    }

    public void getWorkerAssigned2Task(String str, Consumer<NodeUri> consumer) {
        this.taskAssignmentService.getWorkerAssigned2Task(str, consumer);
    }

    public void assignTask(String str, BiConsumer<String, NodeUri> biConsumer) {
        this.taskAssignmentService.assignTask(str, biConsumer);
    }

    public void taskExists(String str, BiConsumer<String, Boolean> biConsumer) {
        this.taskAssignmentService.taskExists(str, biConsumer);
    }

    public void createTask(String str, Consumer<String> consumer) {
        this.taskAssignmentService.createTask(str, consumer);
    }

    public String getIpAddress() {
        return this.ipAddress;
    }

    public String getMasterId() {
        return this.masterId;
    }

    public int getRegisteredPort() {
        return this.registeredPort;
    }

    public String getZkConnect() {
        return this.zkConnect;
    }

    public String getGroupName() {
        return this.groupName;
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getInitialPort() {
        return this.initialPort;
    }

    public ZkConfig getZkConfig() {
        return this.zkConfig;
    }
}
