package com.github.chen0040.zkcoordinator.nodes;

import com.github.chen0040.zkcoordinator.models.NodeUri;
import com.github.chen0040.zkcoordinator.models.RegistrationCompleted;
import com.github.chen0040.zkcoordinator.models.ZkConfig;
import com.github.chen0040.zkcoordinator.services.LeaderWatchService;
import com.github.chen0040.zkcoordinator.services.LeaderWatchServiceImpl;
import com.github.chen0040.zkcoordinator.services.MasterClusterService;
import com.github.chen0040.zkcoordinator.services.MasterClusterServiceImpl;
import com.github.chen0040.zkcoordinator.services.RegistrationService;
import com.github.chen0040.zkcoordinator.services.RegistrationServiceImpl;
import com.github.chen0040.zkcoordinator.services.TaskAssignmentService;
import com.github.chen0040.zkcoordinator.services.TaskAssignmentServiceImpl;
import com.github.chen0040.zkcoordinator.utils.IpTools;
import java.io.IOException;
import java.util.List;
import java.util.function.BiConsumer;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:com/github/chen0040/zkcoordinator/nodes/RequestNode.class */
public class RequestNode implements Watcher, SystemActor, ZookeeperActor {
    private static final Logger logger = LoggerFactory.getLogger(RequestNode.class);
    private RegistrationService registrationService;
    private TaskAssignmentService taskAssignmentService;
    private LeaderWatchService leaderWatchService;
    private MasterClusterService masterClusterService;
    private String requestId;
    private final String groupName;
    private final String zkConnect;
    private final int initialPort;
    private final ZkConfig zkConfig;
    private int registeredPort = -1;
    private boolean running = false;
    private final String ipAddress = IpTools.getIpAddress();

    public RequestNode(ZkConfig zkConfig) {
        this.zkConfig = zkConfig;
        this.initialPort = zkConfig.getStartingPort();
        this.zkConnect = zkConfig.getZkConnect();
        this.groupName = zkConfig.getRequestGroupName();
    }

    @Override // com.github.chen0040.zkcoordinator.nodes.SystemActor
    public void startSystem(String str, int i, String str2) {
        logger.info("start system at {}:{} with id = {}", new Object[]{str, Integer.valueOf(i), str2});
    }

    @Override // com.github.chen0040.zkcoordinator.nodes.SystemActor
    public void stopSystem() {
        logger.info("system shutdown");
    }

    protected void onZkReconnected(String str) {
        logger.info("this instance (id = {}) is now connected to zookeeper", this.requestId);
    }

    protected void onGroupJoined(ZooKeeper zooKeeper, RegistrationCompleted registrationCompleted) {
        int port = registrationCompleted.getPort();
        if (this.running) {
            if (port != this.registeredPort) {
                logger.error("Fatal error: port on re-registration differs from the original registered port!");
                System.exit(0);
                return;
            }
            return;
        }
        this.registeredPort = port;
        this.requestId = this.ipAddress + "-" + port;
        MDC.put("nodeid", this.requestId);
        logger.info("join group: {}:{}", this.ipAddress, Integer.valueOf(port));
        startSystem(this.ipAddress, port, this.requestId);
        this.running = true;
    }

    @Override // com.github.chen0040.zkcoordinator.nodes.ZookeeperActor
    public void start() throws IOException {
        this.registrationService = new RegistrationServiceImpl(this, this.zkConnect, this.zkConfig, this.groupName, this.ipAddress);
        this.registrationService.onZkStarted(zooKeeper -> {
            this.taskAssignmentService = createTaskAssignmentService(zooKeeper);
            this.leaderWatchService = createLeaderWatchService(zooKeeper);
            this.masterClusterService = createMasterClusterService(zooKeeper);
            this.leaderWatchService.watchLeader();
            this.masterClusterService.watchMasters();
        });
        this.registrationService.onZkReconnected(this::onZkReconnected);
        this.registrationService.addGroupJoinListener(this::onGroupJoined);
        this.registrationService.start(this.zkConfig.getSessionTimeout(), this.initialPort);
    }

    @Override // com.github.chen0040.zkcoordinator.nodes.ZookeeperActor
    public void shutdown() throws InterruptedException {
        this.registrationService.stopZk();
        stopSystem();
        this.running = false;
    }

    protected TaskAssignmentService createTaskAssignmentService(ZooKeeper zooKeeper) {
        return new TaskAssignmentServiceImpl(zooKeeper, this.zkConfig);
    }

    protected LeaderWatchService createLeaderWatchService(ZooKeeper zooKeeper) {
        return new LeaderWatchServiceImpl(zooKeeper, this.zkConfig);
    }

    protected MasterClusterService createMasterClusterService(ZooKeeper zooKeeper) {
        return new MasterClusterServiceImpl(zooKeeper, this.zkConfig);
    }

    public void process(WatchedEvent watchedEvent) {
    }

    public boolean leaderExists() {
        return this.leaderWatchService.leaderExists();
    }

    public NodeUri getLeaderUri() {
        return this.leaderWatchService.getLeaderUri();
    }

    public void taskExists(String str, BiConsumer<String, Boolean> biConsumer) {
        this.taskAssignmentService.taskExists(str, biConsumer);
    }

    public void createTask(String str) {
        this.taskAssignmentService.createTask(str, str2 -> {
            logger.info("Task created: {}", str2);
        });
    }

    public List<NodeUri> getMasters() {
        return this.masterClusterService.masters();
    }

    public String getIpAddress() {
        return this.ipAddress;
    }

    public int getRegisteredPort() {
        return this.registeredPort;
    }

    public String getRequestId() {
        return this.requestId;
    }

    public String getGroupName() {
        return this.groupName;
    }

    public String getZkConnect() {
        return this.zkConnect;
    }

    public int getInitialPort() {
        return this.initialPort;
    }

    public ZkConfig getZkConfig() {
        return this.zkConfig;
    }

    public boolean isRunning() {
        return this.running;
    }
}
