package com.github.chen0040.zkcoordinator.services;

import com.github.chen0040.zkcoordinator.consts.TaskStates;
import com.github.chen0040.zkcoordinator.consts.UTF8;
import com.github.chen0040.zkcoordinator.models.RegistrationCompleted;
import com.github.chen0040.zkcoordinator.models.ZkConfig;
import com.github.chen0040.zkcoordinator.utils.IpTools;
import com.github.chen0040.zkcoordinator.utils.ZkTimer;
import com.github.chen0040.zkcoordinator.utils.ZkTimerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:com/github/chen0040/zkcoordinator/services/RegistrationServiceImpl.class */
public class RegistrationServiceImpl implements RegistrationService {
    private static final Logger logger;
    private ZooKeeper zk;
    private Watcher watcher;
    private int port;
    private String serverId;
    private final String ipAddress;
    private final String groupName;
    private final String zkConnect;
    private int sessionTimeout;
    private long reconnectDelayWhenSessionExpired;
    private Consumer<ZooKeeper> zkStarted;
    private Consumer<String> zkClosed;
    private Consumer<String> zkReconnected;
    private final String zkRootPath;
    private final String zkNodePath;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final AtomicBoolean clientState = new AtomicBoolean(true);
    private String zkSessionId = "";
    private long lastTimeout = 0;
    private boolean isAlreadyRegisteredOneTime = false;
    private List<BiConsumer<ZooKeeper, RegistrationCompleted>> groupJoinListeners = new ArrayList();
    private AsyncCallback.StringCallback callbackJoinGroup = (i, str, obj, str2) -> {
        switch (AnonymousClass1.$SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.get(i).ordinal()]) {
            case 1:
                joinGroup();
                return;
            case 2:
                logger.info("Register node successfully: " + this.serverId);
                notifyGroupJoined();
                return;
            case 3:
                logger.info("Already registered: " + this.serverId);
                return;
            default:
                logger.error("Something went wrong.", KeeperException.create(KeeperException.Code.get(i), str));
                return;
        }
    };
    private final ZkTimer timer = ZkTimerFactory.createTimer();

    /* renamed from: com.github.chen0040.zkcoordinator.services.RegistrationServiceImpl$1, reason: invalid class name */
    /* loaded from: input_file:com/github/chen0040/zkcoordinator/services/RegistrationServiceImpl$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$zookeeper$KeeperException$Code = new int[KeeperException.Code.values().length];

        static {
            try {
                $SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.CONNECTIONLOSS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.OK.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.NODEEXISTS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public RegistrationServiceImpl(Watcher watcher, String str, ZkConfig zkConfig, String str2, String str3) {
        this.watcher = watcher;
        this.zkConnect = str;
        this.groupName = str2;
        this.ipAddress = str3;
        this.zkRootPath = zkConfig.getRootPath();
        this.zkNodePath = zkConfig.getNodePath();
        this.reconnectDelayWhenSessionExpired = zkConfig.getReconnectDelayWhenSessionExpired();
    }

    @Override // com.github.chen0040.zkcoordinator.services.RegistrationService
    public String getZkSessionId() {
        return this.zkSessionId;
    }

    private void notifyGroupJoined() {
        this.groupJoinListeners.forEach(biConsumer -> {
            biConsumer.accept(this.zk, new RegistrationCompleted(this.serverId, this.port));
        });
    }

    @Override // com.github.chen0040.zkcoordinator.services.RegistrationService
    public void addGroupJoinListener(BiConsumer<ZooKeeper, RegistrationCompleted> biConsumer) {
        this.groupJoinListeners.add(biConsumer);
    }

    @Override // com.github.chen0040.zkcoordinator.services.RegistrationService
    public void onZkStarted(Consumer<ZooKeeper> consumer) {
        this.zkStarted = consumer;
    }

    private void joinGroup() {
        MDC.put("port", "" + this.port);
        this.zk.create(this.zkRootPath + "/" + this.groupName + "/" + this.serverId, UTF8.getBytes(TaskStates.Idle), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, this.callbackJoinGroup, (Object) null);
    }

    private void register(int i) {
        this.port = i;
        this.serverId = this.ipAddress + "_" + i;
        this.zk.create(this.zkNodePath + "/" + this.serverId, UTF8.getBytes(TaskStates.Idle), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (i2, str, obj, str2) -> {
            switch (AnonymousClass1.$SwitchMap$org$apache$zookeeper$KeeperException$Code[KeeperException.Code.get(i2).ordinal()]) {
                case 1:
                    register(i);
                    return;
                case 2:
                    logger.info("Register successfully: " + this.serverId);
                    this.isAlreadyRegisteredOneTime = true;
                    joinGroup();
                    return;
                case 3:
                    logger.info("Already registered: " + this.serverId);
                    register(IpTools.getNextAvailablePort(i));
                    return;
                default:
                    logger.error("Something went wrong: " + KeeperException.create(KeeperException.Code.get(i2), str));
                    return;
            }
        }, (Object) null);
    }

    @Override // com.github.chen0040.zkcoordinator.services.RegistrationService
    public long getLastTimeout() {
        return this.lastTimeout;
    }

    private void reconnectWhenSessionExpired() {
        this.lastTimeout = new Date().getTime();
        if (!this.clientState.get()) {
            logger.warn("Zk will not reconnect: already closed.");
            return;
        }
        ZooKeeper zooKeeper = this.zk;
        if (!$assertionsDisabled && zooKeeper == null) {
            throw new AssertionError();
        }
        if (zooKeeper.getState().isConnected()) {
            this.zkSessionId = Long.toHexString(zooKeeper.getSessionId());
            logger.info("ZkSession(0x{}) is connected.", this.zkSessionId);
            createTimeout();
        } else {
            this.zkSessionId = "";
            warnReconnect(zooKeeper);
            closeZooKeeper(zooKeeper);
            this.timer.newTimeout(timeOut -> {
                if (timeOut.isCancelled()) {
                    return;
                }
                logger.info("Start reconnecting ...");
                try {
                    start(this.sessionTimeout, this.port);
                } catch (IOException e) {
                    logger.error("Failed to start zk", e);
                }
            }, this.reconnectDelayWhenSessionExpired, TimeUnit.MILLISECONDS);
        }
    }

    private void warnReconnect(ZooKeeper zooKeeper) {
        String str = "Execute reconnectWhenSessionExpired()(Expired session:0x" + Long.toHexString(zooKeeper.getSessionId()) + ").";
        logger.warn(str);
        if (this.zkReconnected != null) {
            this.zkReconnected.accept(str);
        }
    }

    private void closeZooKeeper(ZooKeeper zooKeeper) {
        if (zooKeeper != null) {
            try {
                try {
                    zooKeeper.close();
                    if (this.zkClosed != null) {
                        this.zkClosed.accept("zk-closed");
                    }
                } catch (InterruptedException e) {
                    logger.error("zk closed interrupted", e);
                    if (this.zkClosed != null) {
                        this.zkClosed.accept("zk-closed");
                    }
                }
            } catch (Throwable th) {
                if (this.zkClosed != null) {
                    this.zkClosed.accept("zk-closed");
                }
                throw th;
            }
        }
    }

    @Override // com.github.chen0040.zkcoordinator.services.RegistrationService
    public void start(int i, int i2) throws IOException {
        this.sessionTimeout = i;
        ZooKeeper createNewZookeeper = createNewZookeeper();
        if (createNewZookeeper == null) {
            logger.warn("Failed to create new Zookeeper instance. It will be retry  after {} ms.", Long.valueOf(this.reconnectDelayWhenSessionExpired));
        } else {
            this.zk = createNewZookeeper;
            if (this.zkStarted != null) {
                this.zkStarted.accept(createNewZookeeper);
            }
            if (!this.isAlreadyRegisteredOneTime && !IpTools.isPortAvailable(i2)) {
                i2 = IpTools.getNextAvailablePort(i2);
            }
            register(i2);
        }
        createTimeout();
    }

    private void createTimeout() {
        this.timer.newTimeout(timeOut -> {
            if (timeOut.isCancelled()) {
                return;
            }
            reconnectWhenSessionExpired();
        }, this.reconnectDelayWhenSessionExpired, TimeUnit.MILLISECONDS);
    }

    @Override // com.github.chen0040.zkcoordinator.services.RegistrationService
    public void stopZk() throws InterruptedException {
        if (this.clientState.compareAndSet(true, false)) {
            if (this.timer != null) {
                this.timer.stop();
            }
            closeZooKeeper(this.zk);
        }
    }

    @Override // com.github.chen0040.zkcoordinator.services.RegistrationService
    public void onZkClosed(Consumer<String> consumer) {
        this.zkClosed = consumer;
    }

    @Override // com.github.chen0040.zkcoordinator.services.RegistrationService
    public void onZkReconnected(Consumer<String> consumer) {
        this.zkReconnected = consumer;
    }

    private ZooKeeper createNewZookeeper() {
        ZooKeeper zooKeeper = null;
        try {
            zooKeeper = new ZooKeeper(this.zkConnect, this.sessionTimeout, this.watcher);
            return zooKeeper;
        } catch (IOException e) {
            closeZooKeeper(zooKeeper);
            return null;
        }
    }

    static {
        $assertionsDisabled = !RegistrationServiceImpl.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(RegistrationServiceImpl.class);
    }
}
