package org.wu.framework.easy.redis.listener;

import com.alibaba.fastjson.JSON;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.text.MessageFormat;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.wu.framework.core.exception.AssertFactory;
import org.wu.framework.core.utils.ObjectUtils;
import org.wu.framework.easy.listener.core.ListenerConsumer;
import org.wu.framework.easy.listener.core.SingletonMessageListenerContainer;
import org.wu.framework.easy.listener.core.consumer.ConsumerRecord;
import org.wu.framework.easy.listener.core.consumer.ConsumerRecordType;
import org.wu.framework.easy.listener.core.consumer.ConsumerRecords;
import org.wu.framework.easy.redis.listener.ack.RedisAcknowledgment;
import org.wu.framework.easy.redis.listener.config.MethodRedisListenerEndpoint;
import org.wu.framework.easy.redis.listener.consumer.RedisConsumerRecord;

/* loaded from: input_file:org/wu/framework/easy/redis/listener/RedisSingletonMessageListenerContainer.class */
public class RedisSingletonMessageListenerContainer<K, V> implements SingletonMessageListenerContainer<K, V> {
    private final Logger log = LoggerFactory.getLogger(RedisSingletonMessageListenerContainer.class);
    protected ListenerConsumer listenerConsumer;
    protected boolean running;
    private String beanName;
    private StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection;
    private MethodRedisListenerEndpoint endpoint;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wu/framework/easy/redis/listener/RedisSingletonMessageListenerContainer$RedisListenerStatementConsumer.class */
    public final class RedisListenerStatementConsumer implements ListenerConsumer {
        private boolean running;

        RedisListenerStatementConsumer() {
        }

        public boolean isLongLived() {
            return true;
        }

        public void run() {
            final Method method = RedisSingletonMessageListenerContainer.this.endpoint.getMethod();
            final Object bean = RedisSingletonMessageListenerContainer.this.endpoint.getBean();
            final Class<?>[] parameterTypes = method.getParameterTypes();
            boolean z = null != includeParameterTypes(parameterTypes, ConsumerRecords.class);
            final boolean z2 = null != includeParameterTypes(parameterTypes, ConsumerRecord.class);
            ConsumerRecordType consumerRecord = consumerRecord(method);
            AssertFactory.notNull(consumerRecord, "监听器接收方法不能为空");
            final Class payloadType = consumerRecord.getPayloadType();
            RedisSingletonMessageListenerContainer.this.statefulRedisPubSubConnection.addListener(new RedisPubSubListener<String, String>() { // from class: org.wu.framework.easy.redis.listener.RedisSingletonMessageListenerContainer.RedisListenerStatementConsumer.1
                public void message(String str, final String str2) {
                    ConsumerRecords consumerRecords = null;
                    if (!ObjectUtils.isEmpty(str2)) {
                        if (z2) {
                            try {
                                consumerRecords = new RedisConsumerRecord(null, JSON.parseObject(str2, payloadType));
                            } catch (Exception e) {
                                e.printStackTrace();
                                throw new RuntimeException(MessageFormat.format("错误的数据格式解析:{0}】", str2));
                            }
                        } else {
                            consumerRecords = new ConsumerRecords() { // from class: org.wu.framework.easy.redis.listener.RedisSingletonMessageListenerContainer.RedisListenerStatementConsumer.1.1
                                public Iterator iterator() {
                                    Stream of = Stream.of(str2);
                                    String str3 = str2;
                                    Class cls = payloadType;
                                    return ((List) of.map(str4 -> {
                                        return new ConsumerRecord() { // from class: org.wu.framework.easy.redis.listener.RedisSingletonMessageListenerContainer.RedisListenerStatementConsumer.1.1.1
                                            public Object schema() {
                                                return null;
                                            }

                                            public Object payload() {
                                                try {
                                                    return JSON.parseObject(str3, cls);
                                                } catch (Exception e2) {
                                                    e2.printStackTrace();
                                                    throw new RuntimeException(MessageFormat.format("错误的数据格式解析:{0}】", str3));
                                                }
                                            }
                                        };
                                    }).collect(Collectors.toList())).iterator();
                                }
                            };
                        }
                    }
                    try {
                        method.invoke(bean, RedisListenerStatementConsumer.this.invokeArgs(parameterTypes, new Object[]{consumerRecords, new RedisAcknowledgment(consumerRecords)}));
                    } catch (IllegalAccessException | InvocationTargetException e2) {
                        e2.printStackTrace();
                    }
                }

                public void message(String str, String str2, String str3) {
                }

                public void subscribed(String str, long j) {
                }

                public void psubscribed(String str, long j) {
                }

                public void unsubscribed(String str, long j) {
                }

                public void punsubscribed(String str, long j) {
                }
            });
            RedisSingletonMessageListenerContainer.this.statefulRedisPubSubConnection.async().subscribe((String[]) RedisSingletonMessageListenerContainer.this.endpoint.getTopics().toArray(new String[0]));
        }

        public void setRunning(boolean z) {
            this.running = z;
        }
    }

    public void start() {
        this.listenerConsumer = new RedisListenerStatementConsumer();
        setRunning(true);
        new SimpleAsyncTaskExecutor(getBeanName() + "-redis-").submitListenable(this.listenerConsumer);
    }

    public void stop() {
        this.statefulRedisPubSubConnection.close();
        this.running = false;
        this.log.info("connectionClose redis client with  consumer " + this.endpoint.getConsumer() + " topics " + this.endpoint.getTopics());
    }

    public boolean isRunning() {
        return this.running;
    }

    private String getBeanName() {
        return this.beanName;
    }

    public void setRunning(boolean z) {
        this.running = z;
    }

    public void setBeanName(String str) {
        this.beanName = str;
    }

    public void setStatefulRedisPubSubConnection(StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection) {
        this.statefulRedisPubSubConnection = statefulRedisPubSubConnection;
    }

    public void setEndpoint(MethodRedisListenerEndpoint methodRedisListenerEndpoint) {
        this.endpoint = methodRedisListenerEndpoint;
    }
}
