package cn.ponfee.scheduler.registry.redis;

import cn.ponfee.scheduler.common.base.exception.Throwables;
import cn.ponfee.scheduler.common.concurrent.NamedThreadFactory;
import cn.ponfee.scheduler.common.util.ObjectUtils;
import cn.ponfee.scheduler.core.base.Server;
import cn.ponfee.scheduler.registry.EventType;
import cn.ponfee.scheduler.registry.ServerRegistry;
import cn.ponfee.scheduler.registry.ServerRole;
import cn.ponfee.scheduler.registry.redis.configuration.RedisRegistryProperties;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/* loaded from: input_file:cn/ponfee/scheduler/registry/redis/RedisServerRegistry.class */
public abstract class RedisServerRegistry<R extends Server, D extends Server> extends ServerRegistry<R, D> {
    private static final long REDIS_KEY_TTL_MILLIS = 2592000000L;
    private static final String CHANNEL = "channel";
    private final String registryChannel;
    private final StringRedisTemplate stringRedisTemplate;
    private final long sessionTimeoutMs;
    private final ScheduledThreadPoolExecutor registryScheduledExecutor;
    private volatile long nextRefreshTimeMillis;
    private final RedisMessageListenerContainer redisMessageListenerContainer;

    public RedisServerRegistry(StringRedisTemplate stringRedisTemplate, RedisRegistryProperties redisRegistryProperties) {
        super(redisRegistryProperties.getNamespace(), ':');
        this.nextRefreshTimeMillis = 0L;
        this.registryChannel = this.registryRootPath + this.separator + CHANNEL;
        this.stringRedisTemplate = stringRedisTemplate;
        this.sessionTimeoutMs = redisRegistryProperties.getSessionTimeoutMs();
        this.registryScheduledExecutor = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new NamedThreadFactory("redis_server_registry", true));
        this.registryScheduledExecutor.scheduleWithFixedDelay(() -> {
            if (this.closed.get()) {
                return;
            }
            try {
                doRegister(this.registered);
            } catch (Throwable th) {
                this.log.error("Do scheduled register occur error: " + this.registered, th);
            }
        }, redisRegistryProperties.getRegistryPeriodMs(), redisRegistryProperties.getRegistryPeriodMs(), TimeUnit.MILLISECONDS);
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(stringRedisTemplate.getConnectionFactory());
        redisMessageListenerContainer.setTaskExecutor(this.registryScheduledExecutor);
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(this, "subscribe");
        messageListenerAdapter.afterPropertiesSet();
        redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new ChannelTopic(this.discoveryRootPath + this.separator + CHANNEL));
        redisMessageListenerContainer.afterPropertiesSet();
        redisMessageListenerContainer.start();
        this.redisMessageListenerContainer = redisMessageListenerContainer;
        doRefreshDiscoveryServers();
    }

    public boolean isConnected() {
        Boolean bool = (Boolean) this.stringRedisTemplate.execute(redisConnection -> {
            return Boolean.valueOf(!redisConnection.isClosed());
        });
        return bool != null && bool.booleanValue();
    }

    public final List<D> getDiscoveredServers(String str) {
        doRefreshDiscoveryServersIfNecessary();
        return super.getDiscoveredServers(str);
    }

    public final boolean hasDiscoveredServers() {
        doRefreshDiscoveryServersIfNecessary();
        return super.hasDiscoveredServers();
    }

    public final boolean isDiscoveredServer(D d) {
        doRefreshDiscoveryServersIfNecessary();
        return super.isDiscoveredServer(d);
    }

    public final void register(R r) {
        if (this.closed.get()) {
            return;
        }
        doRegister(Collections.singleton(r));
        this.registered.add(r);
        publish(r, EventType.REGISTER);
        this.log.info("Server registered: {} | {}", this.registryRole.name(), r);
    }

    public final void deregister(R r) {
        this.registered.remove(r);
        Throwables.caught(() -> {
            return this.stringRedisTemplate.opsForZSet().remove(this.registryRootPath, new Object[]{r.serialize()});
        });
        Throwables.caught(() -> {
            publish(r, EventType.DEREGISTER);
        });
        this.log.info("Server deregister: {} | {}", this.registryRole.name(), r);
    }

    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            this.log.warn("Repeat call close method\n{}", ObjectUtils.getStackTrace());
            return;
        }
        RedisMessageListenerContainer redisMessageListenerContainer = this.redisMessageListenerContainer;
        redisMessageListenerContainer.getClass();
        Throwables.caught(redisMessageListenerContainer::stop);
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = this.registryScheduledExecutor;
        scheduledThreadPoolExecutor.getClass();
        Throwables.caught(scheduledThreadPoolExecutor::shutdownNow);
        this.registered.forEach(this::deregister);
        this.registered.clear();
        super.close();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void subscribe(String str, String str2) {
        try {
            int i = (-1) + 1;
            int indexOf = str.indexOf(":", i);
            String substring = str.substring(i, indexOf);
            String substring2 = str.substring(indexOf + 1);
            this.log.info("Subscribed message: {} | {}", str2, str);
            subscribe(EventType.valueOf(substring), (EventType) this.discoveryRole.deserialize(substring2));
        } catch (Throwable th) {
            this.log.error("Parse subscribed message error: " + str + ", " + str2, th);
        }
    }

    private void publish(R r, EventType eventType) {
        this.stringRedisTemplate.convertAndSend(this.registryChannel, eventType.name() + ":" + r.serialize());
    }

    private void subscribe(EventType eventType, D d) {
        doRefreshDiscoveryServers();
    }

    private void doRegister(Set<R> set) {
        if (CollectionUtils.isEmpty(set)) {
            return;
        }
        Double valueOf = Double.valueOf(System.currentTimeMillis() + this.sessionTimeoutMs);
        final Set set2 = (Set) set.stream().map(server -> {
            return ZSetOperations.TypedTuple.of(server.serialize(), valueOf);
        }).collect(Collectors.toSet());
        this.stringRedisTemplate.executePipelined(new SessionCallback<Object>() { // from class: cn.ponfee.scheduler.registry.redis.RedisServerRegistry.1
            /* renamed from: execute, reason: merged with bridge method [inline-methods] */
            public Void m0execute(RedisOperations redisOperations) {
                redisOperations.opsForZSet().add(RedisServerRegistry.this.registryRootPath, set2);
                redisOperations.expire(RedisServerRegistry.this.registryRootPath, RedisServerRegistry.REDIS_KEY_TTL_MILLIS, TimeUnit.MILLISECONDS);
                return null;
            }
        });
    }

    private void doRefreshDiscoveryServersIfNecessary() {
        if (requireRefresh()) {
            synchronized (this) {
                if (requireRefresh()) {
                    doRefreshDiscoveryServers();
                }
            }
        }
    }

    private synchronized void doRefreshDiscoveryServers() {
        final long currentTimeMillis = System.currentTimeMillis();
        Set set = (Set) this.stringRedisTemplate.executePipelined(new SessionCallback<Object>() { // from class: cn.ponfee.scheduler.registry.redis.RedisServerRegistry.2
            /* renamed from: execute, reason: merged with bridge method [inline-methods] */
            public Void m1execute(RedisOperations redisOperations) {
                redisOperations.opsForZSet().removeRangeByScore(RedisServerRegistry.this.discoveryRootPath, 0.0d, currentTimeMillis);
                redisOperations.opsForZSet().rangeByScore(RedisServerRegistry.this.discoveryRootPath, currentTimeMillis, 9.223372036854776E18d);
                redisOperations.expire(RedisServerRegistry.this.discoveryRootPath, RedisServerRegistry.REDIS_KEY_TTL_MILLIS, TimeUnit.MILLISECONDS);
                return null;
            }
        }).get(1);
        if (CollectionUtils.isEmpty(set)) {
            this.log.error("Not discovered available {} from redis.", this.discoveryRole.name());
            set = Collections.emptySet();
        }
        Stream stream = set.stream();
        ServerRole serverRole = this.discoveryRole;
        serverRole.getClass();
        refreshDiscoveredServers((List) stream.map(serverRole::deserialize).collect(Collectors.toList()));
        updateRefresh();
    }

    private boolean requireRefresh() {
        return this.nextRefreshTimeMillis < System.currentTimeMillis();
    }

    private void updateRefresh() {
        this.nextRefreshTimeMillis = System.currentTimeMillis() + this.sessionTimeoutMs;
    }
}
