package xyz.migoo.framework.mq.config;

import cn.hutool.system.SystemUtil;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import xyz.migoo.framework.mq.core.pubsub.AbstractChannelMessageListener;
import xyz.migoo.framework.mq.core.stream.AbstractStreamMessageListener;
import xyz.migoo.framework.redis.config.RedisAutoConfiguration;

@AutoConfigureAfter({RedisAutoConfiguration.class})
/* loaded from: input_file:xyz/migoo/framework/mq/config/MQAutoConfiguration.class */
public class MQAutoConfiguration {
    private static final Logger log = LoggerFactory.getLogger(MQAutoConfiguration.class);

    @Bean(initMethod = "start", destroyMethod = "stop")
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, List<AbstractChannelMessageListener<?>> list) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        list.forEach(abstractChannelMessageListener -> {
            redisMessageListenerContainer.addMessageListener(abstractChannelMessageListener, new ChannelTopic(abstractChannelMessageListener.getChannel()));
            log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]", abstractChannelMessageListener.getChannel(), abstractChannelMessageListener.getClass().getName());
        });
        return redisMessageListenerContainer;
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(RedisTemplate<String, Object> redisTemplate, List<AbstractStreamMessageListener<?>> list) {
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> create = StreamMessageListenerContainer.create(redisTemplate.getRequiredConnectionFactory(), StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().batchSize(10).targetType(String.class).build());
        String buildConsumerName = buildConsumerName();
        list.forEach(abstractStreamMessageListener -> {
            try {
                redisTemplate.opsForStream().createGroup(abstractStreamMessageListener.getStreamKey(), abstractStreamMessageListener.getGroup());
            } catch (Exception e) {
            }
            abstractStreamMessageListener.setRedisTemplate(redisTemplate);
            create.register(StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(abstractStreamMessageListener.getStreamKey(), ReadOffset.lastConsumed())).consumer(Consumer.from(abstractStreamMessageListener.getGroup(), buildConsumerName)).autoAcknowledge(false).cancelOnError(th -> {
                return false;
            }).build(), abstractStreamMessageListener);
            log.info("[redisStreamMessageListenerContainer][注册 Stream({}) 对应的监听器({})]", abstractStreamMessageListener.getStreamKey(), abstractStreamMessageListener.getClass().getName());
        });
        return create;
    }

    private static String buildConsumerName() {
        return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), Long.valueOf(SystemUtil.getCurrentPID()));
    }
}
