package org.onetwo.boot.mq.task;

import java.io.Serializable;
import java.util.List;
import org.joda.time.LocalDateTime;
import org.onetwo.boot.module.redis.RedisLockRunner;
import org.onetwo.boot.mq.MQProperties;
import org.onetwo.boot.mq.ProducerService;
import org.onetwo.boot.mq.SendMessageFlags;
import org.onetwo.boot.mq.entity.SendMessageEntity;
import org.onetwo.boot.mq.repository.SendMessageRepository;
import org.onetwo.boot.mq.serializer.MessageBodyStoreSerializer;
import org.onetwo.common.log.JFishLoggerFactory;
import org.onetwo.common.utils.LangOps;
import org.onetwo.common.utils.LangUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.Assert;

/* loaded from: input_file:org/onetwo/boot/mq/task/CompensationSendMessageTask.class */
public class CompensationSendMessageTask implements InitializingBean {
    public static final String LOCK_KEY = "locker:ons:send_message_task";
    protected Logger log = JFishLoggerFactory.getLogger(getClass());

    @Autowired(required = false)
    private RedisLockRegistry redisLockRegistry;

    @Autowired
    private MessageBodyStoreSerializer messageBodyStoreSerializer;

    @Autowired
    private SendMessageRepository sendMessageRepository;

    @Autowired
    private ProducerService<?, ?> producerService;
    private boolean useReidsLock;
    private String redisLockTimeout;

    @Autowired
    private MQProperties mqProperties;

    public void afterPropertiesSet() throws Exception {
        if (this.useReidsLock) {
            Assert.notNull(this.redisLockRegistry, "redisLockRegistry not found!");
        }
    }

    public void setUseReidsLock(boolean z) {
        this.useReidsLock = z;
    }

    @Scheduled(fixedRateString = "${jfish.mq.sendTask.fixedRateString:60000}", initialDelay = 30000)
    public void scheduleCheckSendMessage() {
        MQProperties.SendTaskProps sendTask = this.mqProperties.getTransactional().getSendTask();
        doCheckSendMessage(sendTask.getSendCountPerTask(), (int) LangOps.timeToSeconds(sendTask.getIgnoreCreateAtRecently(), 60L));
    }

    protected void doCheckSendMessage(int i, int i2) {
        this.log.info("start to check unsend message...");
        if (this.useReidsLock) {
            getRedisLockRunner().tryLock(() -> {
                findAndProcessUnsendMessage(i, i2);
                return null;
            });
        } else {
            findAndProcessUnsendMessage(i, i2);
        }
        this.log.info("finish check unsend message...");
    }

    protected void findAndProcessUnsendMessage(int i, int i2) {
        LocalDateTime minusSeconds = LocalDateTime.now().minusSeconds(i2);
        String locker = this.mqProperties.getTransactional().getSendTask().getLocker();
        int lockSendMessage = this.sendMessageRepository.lockSendMessage(locker, minusSeconds.toDate(), SendMessageEntity.SendStates.UNSEND);
        if (this.log.isInfoEnabled()) {
            this.log.info("lock [{}] mesage from database", Integer.valueOf(lockSendMessage));
        }
        List<SendMessageEntity> findLockerMessage = this.sendMessageRepository.findLockerMessage(locker, minusSeconds.toDate(), SendMessageEntity.SendStates.UNSEND, i);
        if (LangUtils.isEmpty(findLockerMessage)) {
            if (this.log.isInfoEnabled()) {
                this.log.info("no unsend mesage found from database");
                return;
            }
            return;
        }
        if (this.log.isInfoEnabled()) {
            this.log.info("find [{}] mesage from database to be sending", Integer.valueOf(findLockerMessage.size()));
        }
        for (SendMessageEntity sendMessageEntity : findLockerMessage) {
            processUnsendMessage(sendMessageEntity, this.messageBodyStoreSerializer.deserialize(sendMessageEntity.getBody()));
            this.sendMessageRepository.updateToSent(sendMessageEntity);
            if (this.log.isInfoEnabled()) {
                this.log.info("resend message and remove from database, key: {}", sendMessageEntity.getKey());
            }
        }
    }

    protected void processUnsendMessage(SendMessageEntity sendMessageEntity, Serializable serializable) {
        this.producerService.send(serializable, SendMessageFlags.DisableDatabaseTransactional);
    }

    private RedisLockRunner getRedisLockRunner() {
        return RedisLockRunner.createLoker(this.redisLockRegistry, LOCK_KEY, this.redisLockTimeout);
    }

    public void setRedisLockTimeout(String str) {
        this.redisLockTimeout = str;
    }
}
