package cn.ponfee.disjob.registry.database;

import cn.ponfee.disjob.common.base.RetryTemplate;
import cn.ponfee.disjob.common.concurrent.LoopThread;
import cn.ponfee.disjob.common.concurrent.Threads;
import cn.ponfee.disjob.common.exception.Throwables;
import cn.ponfee.disjob.common.spring.JdbcTemplateWrapper;
import cn.ponfee.disjob.core.base.Server;
import cn.ponfee.disjob.registry.ServerRegistry;
import cn.ponfee.disjob.registry.ServerRole;
import cn.ponfee.disjob.registry.database.configuration.DatabaseRegistryProperties;
import java.sql.PreparedStatement;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PreDestroy;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.util.Assert;

/* loaded from: input_file:cn/ponfee/disjob/registry/database/DatabaseServerRegistry.class */
public abstract class DatabaseServerRegistry<R extends Server, D extends Server> extends ServerRegistry<R, D> {
    private static final long DEAD_TIME_MILLIS = TimeUnit.HOURS.toMillis(12);
    private static final String TABLE_NAME = "disjob_registry";
    private static final String CREATE_TABLE_DDL = "CREATE TABLE IF NOT EXISTS `disjob_registry` (                                                         \n  `id`              BIGINT        UNSIGNED  NOT NULL  AUTO_INCREMENT  COMMENT 'auto increment id',        \n  `namespace`       VARCHAR(60)             NOT NULL                  COMMENT 'registry namespace',       \n  `role`            VARCHAR(30)             NOT NULL                  COMMENT 'role(worker, supervisor)', \n  `server`          VARCHAR(255)            NOT NULL                  COMMENT 'server serialization',     \n  `heartbeat_time`  BIGINT        UNSIGNED  NOT NULL                  COMMENT 'last heartbeat time',      \n  PRIMARY KEY (`id`),                                                                                     \n  UNIQUE KEY `uk_namespace_role_server` (`namespace`, `role`, `server`)                                   \n) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='Database registry'; \n";
    private static final String REMOVE_DEAD_SQL = "DELETE FROM disjob_registry WHERE namespace=? AND role=? AND heartbeat_time<?";
    private static final String REGISTER_SQL = "INSERT INTO disjob_registry (namespace, role, server, heartbeat_time) VALUES (?, ?, ?, ?)";
    private static final String HEARTBEAT_SQL = "UPDATE disjob_registry SET heartbeat_time=? WHERE namespace=? AND role=? AND server=?";
    private static final String DEREGISTER_SQL = "DELETE FROM disjob_registry WHERE namespace=? AND role=? AND server=?";
    private static final String SELECT_SQL = "SELECT server FROM disjob_registry WHERE namespace=? AND role=? AND heartbeat_time>?";
    private final String namespace;
    private final JdbcTemplateWrapper jdbcTemplateWrapper;
    private final long sessionTimeoutMs;
    private final String registerRoleName;
    private final LoopThread registerHeartbeatThread;
    private final String discoveryRoleName;
    private final LoopThread discoverHeartbeatThread;

    /* JADX INFO: Access modifiers changed from: protected */
    public DatabaseServerRegistry(DatabaseRegistryProperties databaseRegistryProperties, JdbcTemplateWrapper jdbcTemplateWrapper) {
        super(databaseRegistryProperties.getNamespace(), ':');
        this.namespace = databaseRegistryProperties.getNamespace().trim();
        this.jdbcTemplateWrapper = jdbcTemplateWrapper;
        this.sessionTimeoutMs = databaseRegistryProperties.getSessionTimeoutMs();
        long sessionTimeoutMs = databaseRegistryProperties.getSessionTimeoutMs() / 3;
        this.registerRoleName = this.registryRole.name().toLowerCase();
        this.jdbcTemplateWrapper.createTableIfNotExists(TABLE_NAME, CREATE_TABLE_DDL);
        Object[] objArr = {this.namespace, this.registerRoleName, Long.valueOf(System.currentTimeMillis() - DEAD_TIME_MILLIS)};
        RetryTemplate.executeQuietly(() -> {
            return Integer.valueOf(this.jdbcTemplateWrapper.delete(REMOVE_DEAD_SQL, objArr));
        }, 3, 1000L);
        this.registerHeartbeatThread = LoopThread.createStarted("database_register_heartbeat", sessionTimeoutMs, sessionTimeoutMs, this::registerServers);
        this.discoveryRoleName = this.discoveryRole.name().toLowerCase();
        this.discoverHeartbeatThread = LoopThread.createStarted("database_discover_heartbeat", sessionTimeoutMs, sessionTimeoutMs, this::discoverServers);
        try {
            discoverServers();
        } catch (Throwable th) {
            Threads.interruptIfNecessary(th);
            close();
            throw new Error("Database init discover error.", th);
        }
    }

    public boolean isConnected() {
        try {
            this.jdbcTemplateWrapper.existsTable(TABLE_NAME);
            return true;
        } catch (Throwable th) {
            Threads.interruptIfNecessary(th);
            return false;
        }
    }

    public final void register(R r) {
        if (this.closed.get()) {
            return;
        }
        this.jdbcTemplateWrapper.executeInTransaction(throwingFunction -> {
            String serialize = r.serialize();
            PreparedStatement preparedStatement = (PreparedStatement) throwingFunction.apply(HEARTBEAT_SQL);
            preparedStatement.setLong(1, System.currentTimeMillis());
            preparedStatement.setString(2, this.namespace);
            preparedStatement.setString(3, this.registerRoleName);
            preparedStatement.setString(4, serialize);
            int executeUpdate = preparedStatement.executeUpdate();
            Assert.isTrue(executeUpdate <= 1, () -> {
                return "Invalid update rows affected: " + executeUpdate;
            });
            if (executeUpdate == 1) {
                this.log.info("Database register update: {} | {} | {}", new Object[]{this.namespace, this.registerRoleName, serialize});
                return;
            }
            PreparedStatement preparedStatement2 = (PreparedStatement) throwingFunction.apply(REGISTER_SQL);
            preparedStatement2.setString(1, this.namespace);
            preparedStatement2.setString(2, this.registerRoleName);
            preparedStatement2.setString(3, serialize);
            preparedStatement2.setLong(4, System.currentTimeMillis());
            int executeUpdate2 = preparedStatement2.executeUpdate();
            Assert.isTrue(executeUpdate2 == 1, () -> {
                return "Invalid insert rows affected: " + executeUpdate2;
            });
            this.log.info("Database register insert: {} | {} | {}", new Object[]{this.namespace, Integer.valueOf(executeUpdate2), serialize});
        });
        this.registered.add(r);
    }

    public final void deregister(R r) {
        this.registered.remove(r);
        Object[] objArr = {this.namespace, this.registerRoleName, r.serialize()};
        Throwables.ThrowingSupplier.doCaught(() -> {
            return Integer.valueOf(this.jdbcTemplateWrapper.delete(DEREGISTER_SQL, objArr));
        });
        this.log.info("Server deregister: {} | {}", this.registryRole.name(), r);
    }

    public List<R> getRegisteredServers() {
        return deserializeRegistryServers(this.jdbcTemplateWrapper.list(SELECT_SQL, JdbcTemplateWrapper.STRING_ROW_MAPPER, new Object[]{this.namespace, this.registerRoleName, Long.valueOf(System.currentTimeMillis() - this.sessionTimeoutMs)}));
    }

    @PreDestroy
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.registerHeartbeatThread.terminate();
            this.registered.forEach(this::deregister);
            this.registered.clear();
            this.discoverHeartbeatThread.terminate();
            super.close();
        }
    }

    private void registerServers() {
        Iterator it = this.registered.iterator();
        while (it.hasNext()) {
            String serialize = ((Server) it.next()).serialize();
            RetryTemplate.executeQuietly(() -> {
                Object[] objArr = {Long.valueOf(System.currentTimeMillis()), this.namespace, this.registerRoleName, serialize};
                if (this.jdbcTemplateWrapper.update(HEARTBEAT_SQL, objArr) == 1) {
                    this.log.debug("Database heartbeat register update: {} | {} | {} | {}", objArr);
                    return;
                }
                Object[] objArr2 = {this.namespace, this.registerRoleName, serialize, Long.valueOf(System.currentTimeMillis())};
                this.jdbcTemplateWrapper.insert(REGISTER_SQL, objArr2);
                this.log.debug("Database heartbeat register insert: {} | {} | {} | {}", objArr2);
            }, 3, 1000L);
        }
    }

    private void discoverServers() throws Throwable {
        RetryTemplate.execute(() -> {
            List list = this.jdbcTemplateWrapper.list(SELECT_SQL, JdbcTemplateWrapper.STRING_ROW_MAPPER, new Object[]{this.namespace, this.discoveryRoleName, Long.valueOf(System.currentTimeMillis() - this.sessionTimeoutMs)});
            if (CollectionUtils.isEmpty(list)) {
                this.log.warn("Not discovered available {} from database.", this.discoveryRole.name());
                list = Collections.emptyList();
            }
            Stream stream = list.stream();
            ServerRole serverRole = this.discoveryRole;
            serverRole.getClass();
            refreshDiscoveredServers((List) stream.map(serverRole::deserialize).collect(Collectors.toList()));
            this.log.debug("Database discovered {} servers.", this.discoveryRole.name());
        }, 3, 1000L);
    }
}
