package com.ddphin.rabbitmq.sender.impl;

import com.alibaba.fastjson.JSONObject;
import com.ddphin.id.endpoint.IDWorkerAware;
import com.ddphin.rabbitmq.entity.CorrelationDataMQ;
import com.ddphin.rabbitmq.entity.CorrelationDataRedis;
import com.ddphin.rabbitmq.sender.RabbitmqCommonTxMessageSender;
import com.ddphin.redis.helper.RedisHelper;
import java.util.Arrays;
import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.Ordered;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;

/* loaded from: input_file:com/ddphin/rabbitmq/sender/impl/RabbitmqCommonTxMessageSenderImpl.class */
public class RabbitmqCommonTxMessageSenderImpl extends IDWorkerAware implements RabbitmqCommonTxMessageSender, RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback, TransactionSynchronization, Ordered, InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(RabbitmqCommonTxMessageSenderImpl.class);
    private static final String message_cache_name_id_prepare = "_mq_@message@zset@id_prepare";
    private static final String message_cache_name_id_do = "_mq_@message@zset@id_do";
    private static final String message_cache_name_id_redo = "_mq_@message@set@id_redo";
    private static final String message_cache_name_data_death = "_mq_@message@hash@data_death";
    private static final String message_cache_name_data_normal = "_mq_@message@hash@data_normal";
    private DefaultRedisScript<Long> script_mq_save_prepare;
    private DefaultRedisScript<Long> script_mq_move_prepare_to_do;
    private DefaultRedisScript<Long> script_mq_remove_prepare;
    private DefaultRedisScript<Long> script_mq_remove_do;
    private DefaultRedisScript<Long> script_mq_move_do_to_redo_with_id;
    private DefaultRedisScript<Long> script_mq_save_death;
    private DefaultRedisScript<String> script_mq_move_redo_to_do;
    private DefaultRedisScript<Long> script_mq_move_do_to_redo_with_timestamp;
    private DefaultRedisScript<Long> script_mq_move_prepare_to_death;
    private RabbitTemplate rabbitTemplate;
    private RedisHelper redisHelper;
    private ThreadLocal<CorrelationDataMQ> singleMessage = ThreadLocal.withInitial(() -> {
        return null;
    });
    private ThreadLocal<Boolean> register = ThreadLocal.withInitial(() -> {
        return false;
    });

    public RabbitmqCommonTxMessageSenderImpl(RabbitTemplate rabbitTemplate, RedisHelper redisHelper) {
        this.rabbitTemplate = rabbitTemplate;
        this.redisHelper = redisHelper;
        DefaultResourceLoader defaultResourceLoader = new DefaultResourceLoader();
        this.script_mq_save_prepare = new DefaultRedisScript<>();
        this.script_mq_save_prepare.setResultType(Long.class);
        this.script_mq_save_prepare.setScriptSource(new ResourceScriptSource(defaultResourceLoader.getResource("classpath:lua/mq_save_prepare.lua")));
        this.script_mq_move_prepare_to_do = new DefaultRedisScript<>();
        this.script_mq_move_prepare_to_do.setResultType(Long.class);
        this.script_mq_move_prepare_to_do.setScriptSource(new ResourceScriptSource(defaultResourceLoader.getResource("classpath:lua/mq_move_prepare_to_do.lua")));
        this.script_mq_remove_prepare = new DefaultRedisScript<>();
        this.script_mq_remove_prepare.setResultType(Long.class);
        this.script_mq_remove_prepare.setScriptSource(new ResourceScriptSource(defaultResourceLoader.getResource("classpath:lua/mq_remove_prepare.lua")));
        this.script_mq_remove_do = new DefaultRedisScript<>();
        this.script_mq_remove_do.setResultType(Long.class);
        this.script_mq_remove_do.setScriptSource(new ResourceScriptSource(defaultResourceLoader.getResource("classpath:lua/mq_remove_do.lua")));
        this.script_mq_move_do_to_redo_with_id = new DefaultRedisScript<>();
        this.script_mq_move_do_to_redo_with_id.setResultType(Long.class);
        this.script_mq_move_do_to_redo_with_id.setScriptSource(new ResourceScriptSource(defaultResourceLoader.getResource("classpath:lua/mq_move_do_to_redo_with_id.lua")));
        this.script_mq_save_death = new DefaultRedisScript<>();
        this.script_mq_save_death.setResultType(Long.class);
        this.script_mq_save_death.setScriptSource(new ResourceScriptSource(defaultResourceLoader.getResource("classpath:lua/mq_save_death.lua")));
        this.script_mq_move_redo_to_do = new DefaultRedisScript<>();
        this.script_mq_move_redo_to_do.setResultType(String.class);
        this.script_mq_move_redo_to_do.setScriptSource(new ResourceScriptSource(defaultResourceLoader.getResource("classpath:lua/mq_move_redo_to_do.lua")));
        this.script_mq_move_do_to_redo_with_timestamp = new DefaultRedisScript<>();
        this.script_mq_move_do_to_redo_with_timestamp.setResultType(Long.class);
        this.script_mq_move_do_to_redo_with_timestamp.setScriptSource(new ResourceScriptSource(defaultResourceLoader.getResource("classpath:lua/mq_move_do_to_redo_with_timestamp.lua")));
        this.script_mq_move_prepare_to_death = new DefaultRedisScript<>();
        this.script_mq_move_prepare_to_death.setResultType(Long.class);
        this.script_mq_move_prepare_to_death.setScriptSource(new ResourceScriptSource(defaultResourceLoader.getResource("classpath:lua/mq_move_prepare_to_death.lua")));
    }

    private Long script_mq_save_prepare(CorrelationDataRedis correlationDataRedis) {
        return (Long) this.redisHelper.redis().execute(this.script_mq_save_prepare, Arrays.asList(message_cache_name_id_prepare, message_cache_name_data_normal), new Object[]{correlationDataRedis.getId(), JSONObject.toJSONString(correlationDataRedis), Long.valueOf(System.currentTimeMillis())});
    }

    private Long script_mq_move_prepare_to_do(CorrelationDataRedis correlationDataRedis) {
        return (Long) this.redisHelper.redis().execute(this.script_mq_move_prepare_to_do, Arrays.asList(message_cache_name_id_prepare, message_cache_name_id_do), new Object[]{correlationDataRedis.getId(), Long.valueOf(System.currentTimeMillis())});
    }

    private Long script_mq_remove_prepare(CorrelationDataRedis correlationDataRedis) {
        return (Long) this.redisHelper.redis().execute(this.script_mq_remove_prepare, Arrays.asList(message_cache_name_id_prepare, message_cache_name_data_normal), new Object[]{correlationDataRedis.getId()});
    }

    private Long script_mq_remove_do(CorrelationDataRedis correlationDataRedis) {
        return (Long) this.redisHelper.redis().execute(this.script_mq_remove_do, Arrays.asList(message_cache_name_id_do, message_cache_name_data_normal), new Object[]{correlationDataRedis.getId()});
    }

    private Long script_mq_move_do_to_redo_with_id(CorrelationDataRedis correlationDataRedis) {
        return (Long) this.redisHelper.redis().execute(this.script_mq_move_do_to_redo_with_id, Arrays.asList(message_cache_name_id_do, message_cache_name_id_redo), new Object[]{correlationDataRedis.getId()});
    }

    private Long script_mq_save_death(CorrelationDataRedis correlationDataRedis) {
        return (Long) this.redisHelper.redis().execute(this.script_mq_save_death, Collections.singletonList(message_cache_name_data_death), new Object[]{correlationDataRedis.getId(), correlationDataRedis});
    }

    private String script_mq_move_redo_to_do() {
        return (String) this.redisHelper.redis().execute(this.script_mq_move_redo_to_do, Arrays.asList(message_cache_name_id_redo, message_cache_name_id_do, message_cache_name_data_normal), new Object[]{Long.valueOf(System.currentTimeMillis())});
    }

    private Long script_mq_move_do_to_redo_with_timestamp() {
        return (Long) this.redisHelper.redis().execute(this.script_mq_move_do_to_redo_with_timestamp, Arrays.asList(message_cache_name_id_do, message_cache_name_id_redo), new Object[]{Long.valueOf(System.currentTimeMillis() - 60000)});
    }

    private Long script_mq_move_prepare_to_death() {
        return (Long) this.redisHelper.redis().execute(this.script_mq_move_prepare_to_death, Arrays.asList(message_cache_name_id_prepare, message_cache_name_data_normal, message_cache_name_data_death), new Object[]{Long.valueOf(System.currentTimeMillis() - 60000)});
    }

    @Override // com.ddphin.rabbitmq.sender.RabbitmqCommonTxMessageSender
    public void send(String str, String str2, Object obj) {
        send(str, str2, null, obj);
    }

    @Override // com.ddphin.rabbitmq.sender.RabbitmqCommonTxMessageSender
    public void send(String str, String str2, Long l, Object obj) {
        Assert.isTrue(TransactionSynchronizationManager.isActualTransactionActive(), "@Transactional is required");
        Assert.isNull(this.singleMessage.get(), "Only support single message per thread");
        CorrelationDataMQ correlationDataMQ = new CorrelationDataMQ(str, str2, l, obj, String.valueOf(nextId()));
        if (!this.register.get().booleanValue()) {
            TransactionSynchronizationManager.registerSynchronization(this);
            this.register.set(true);
        }
        this.singleMessage.set(correlationDataMQ);
    }

    private void send(CorrelationDataMQ correlationDataMQ) {
        log.info("发送消息:\n      exchange: {}\n    routingKey: {}\n        millis: {}\n       message: {}", new Object[]{correlationDataMQ.getExchange(), correlationDataMQ.getRoutingKey(), correlationDataMQ.getMillis(), correlationDataMQ.getData()});
        Object obj = null;
        try {
            obj = JSONObject.parseObject(correlationDataMQ.getData(), Class.forName(correlationDataMQ.getClazz()));
        } catch (ClassNotFoundException e) {
            log.error("retry send message error", e);
        }
        if (null != correlationDataMQ.getMillis()) {
            this.rabbitTemplate.convertAndSend(correlationDataMQ.getExchange(), correlationDataMQ.getRoutingKey(), obj, message -> {
                message.getMessageProperties().setExpiration(String.valueOf(correlationDataMQ.getMillis()));
                return message;
            }, correlationDataMQ);
        } else {
            this.rabbitTemplate.convertAndSend(correlationDataMQ.getExchange(), correlationDataMQ.getRoutingKey(), obj, correlationDataMQ);
        }
    }

    public void confirm(CorrelationData correlationData, boolean z, String str) {
        CorrelationDataMQ correlationDataMQ = (CorrelationDataMQ) correlationData;
        if (z) {
            log.info("消息发送到exchange成功:\n    id: {}", correlationData.getId());
            script_mq_remove_do(new CorrelationDataRedis(correlationDataMQ));
        } else {
            if (3 <= correlationDataMQ.getRetry().intValue()) {
                log.error("消息发送到exchange失败:保存:\n       id: {}\n    cause: {}", correlationData.getId(), str);
                script_mq_move_do_to_redo_with_id(new CorrelationDataRedis(correlationDataMQ));
                return;
            }
            try {
                Thread.sleep((correlationDataMQ.getRetry().intValue() + 1) * 1000);
            } catch (InterruptedException e) {
                log.error("confirm error", e);
            }
            log.warn("消息发送到exchange失败:重试:\n       id: {}\n    cause: {}", correlationData.getId(), str);
            send(correlationDataMQ.retry());
        }
    }

    public void returnedMessage(Message message, int i, String str, String str2, String str3) {
        String valueOf = String.valueOf(message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"));
        String valueOf2 = String.valueOf(message.getMessageProperties().getHeaders().get("__TypeId__"));
        String str4 = new String(message.getBody());
        Object object = getObject(str4, valueOf2);
        Long valueOf3 = Long.valueOf(message.getMessageProperties().getExpiration());
        Assert.notNull(object, "message 消息体不能为NULL");
        log.error("消息发送到queue失败:保存:\n            id: {}\n         clazz: {}\n          data: {}\n      exchange: {}\n    routingKey: {}\n     replyText: {}\n", new Object[]{valueOf, valueOf2, str4, str2, str3, str});
        script_mq_save_death(new CorrelationDataRedis(new CorrelationDataMQ(str2, str3, valueOf3, object, valueOf)));
    }

    private Object getObject(String str, String str2) {
        try {
            return JSONObject.parseObject(str, Class.forName(str2));
        } catch (ClassNotFoundException e) {
            log.error("getObject", e);
            return null;
        }
    }

    public void afterPropertiesSet() {
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);
    }

    public int getOrder() {
        return Integer.MAX_VALUE;
    }

    public void beforeCommit(boolean z) {
        script_mq_save_prepare(new CorrelationDataRedis(this.singleMessage.get()));
    }

    public void afterCommit() {
        script_mq_move_prepare_to_do(new CorrelationDataRedis(this.singleMessage.get()));
        send(this.singleMessage.get());
    }

    public void afterCompletion(int i) {
        if (0 != i) {
            script_mq_remove_prepare(new CorrelationDataRedis(this.singleMessage.get()));
        }
        this.register.remove();
        this.register.set(false);
        this.singleMessage.remove();
        this.singleMessage.set(null);
    }

    @Override // com.ddphin.rabbitmq.sender.RabbitmqCommonTxMessageSender
    public void retry() {
        String script_mq_move_redo_to_do;
        log.info("retry");
        do {
            script_mq_move_redo_to_do = script_mq_move_redo_to_do();
            log.info("retry\n    data:{}", script_mq_move_redo_to_do);
            if (null != script_mq_move_redo_to_do) {
                CorrelationDataRedis correlationDataRedis = (CorrelationDataRedis) JSONObject.parseObject(script_mq_move_redo_to_do, CorrelationDataRedis.class);
                send(new CorrelationDataMQ(correlationDataRedis.getExchange(), correlationDataRedis.getRoutingKey(), correlationDataRedis.getMillis(), correlationDataRedis.getData(), correlationDataRedis.getId()));
            }
        } while (null != script_mq_move_redo_to_do);
    }

    @Override // com.ddphin.rabbitmq.sender.RabbitmqCommonTxMessageSender
    public void redo() {
        log.info("redo");
        script_mq_move_do_to_redo_with_timestamp();
    }

    @Override // com.ddphin.rabbitmq.sender.RabbitmqCommonTxMessageSender
    public void clear() {
        log.info("clear");
        script_mq_move_prepare_to_death();
    }
}
