package org.onetwo.boot.mq.interceptor;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Date;
import org.onetwo.boot.core.web.async.AsyncTaskDelegateService;
import org.onetwo.boot.core.web.mvc.interceptor.WebInterceptorAdapter;
import org.onetwo.boot.mq.MQUtils;
import org.onetwo.boot.mq.SendMessageContext;
import org.onetwo.boot.mq.entity.SendMessageEntity;
import org.onetwo.boot.mq.interceptor.DatabaseTransactionMessageInterceptor;
import org.onetwo.boot.mq.repository.SendMessageRepository;
import org.onetwo.boot.mq.serializer.MessageBodyStoreSerializer;
import org.onetwo.common.exception.ServiceException;
import org.onetwo.common.log.JFishLoggerFactory;
import org.onetwo.common.utils.StringUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.annotation.Order;
import org.springframework.util.Assert;

@Order(WebInterceptorAdapter.FIRST)
/* loaded from: input_file:org/onetwo/boot/mq/interceptor/SimpleDatabaseTransactionMessageInterceptor.class */
public class SimpleDatabaseTransactionMessageInterceptor implements InitializingBean, SendMessageInterceptor, DatabaseTransactionMessageInterceptor {

    @Autowired
    protected ApplicationEventPublisher applicationEventPublisher;

    @Autowired(required = false)
    private AsyncTaskDelegateService asyncTaskDelegateService;
    private boolean useAsync = false;
    private SendMessageRepository sendMessageRepository;

    @Autowired
    private MessageBodyStoreSerializer messageBodyStoreSerializer;

    public boolean isUseAsync() {
        return this.useAsync;
    }

    public void setUseAsync(boolean z) {
        this.useAsync = z;
    }

    protected Logger getLogger() {
        return JFishLoggerFactory.getLogger(getClass());
    }

    @Override // org.onetwo.boot.mq.interceptor.SendMessageInterceptor
    public Object intercept(SendMessageInterceptorChain sendMessageInterceptorChain) {
        SendMessageContext sendMessageContext = sendMessageInterceptorChain.getSendMessageContext();
        if (sendMessageContext.isTransactional()) {
            return sendMessageInterceptorChain.invoke();
        }
        boolean isDebug = sendMessageContext.isDebug();
        if (isDebug && getLogger().isInfoEnabled()) {
            getLogger().info("start transactional message in thread[{}]...", Long.valueOf(sendMessageContext.getThreadId()));
        }
        sendMessageContext.setDebug(isDebug);
        storeAndPublishSendMessageEvent(sendMessageContext);
        return MQUtils.DEFAULT_SUSPEND;
    }

    public void afterPropertiesSet() throws Exception {
        if (this.useAsync) {
            Assert.notNull(this.asyncTaskDelegateService, "asyncTaskDelegateService not found!");
        }
    }

    protected void storeAndPublishSendMessageEvent(SendMessageContext<?> sendMessageContext) {
        storeSendMessage(sendMessageContext);
        this.applicationEventPublisher.publishEvent(DatabaseTransactionMessageInterceptor.SendMessageEvent.builder().sendMessageContext(sendMessageContext).build());
        boolean isDebug = sendMessageContext.isDebug();
        Logger logger = getLogger();
        if (isDebug && logger.isInfoEnabled()) {
            logger.info("publish message event : {}", sendMessageContext.getMessageEntity().getKey());
        }
    }

    protected void storeSendMessage(SendMessageContext<?> sendMessageContext) {
        Serializable message = sendMessageContext.getMessage();
        String key = sendMessageContext.getKey();
        if (StringUtils.isBlank(key)) {
            throw new ServiceException("message key can not be blank!");
        }
        sendMessageContext.setMessageEntity(createSendMessageEntity(key, message));
        getSendMessageRepository().save(sendMessageContext);
    }

    protected SendMessageEntity createSendMessageEntity(String str, Serializable serializable) {
        SendMessageEntity sendMessageEntity = new SendMessageEntity();
        sendMessageEntity.setKey(str);
        sendMessageEntity.setState(SendMessageEntity.SendStates.UNSEND);
        sendMessageEntity.setBody(this.messageBodyStoreSerializer.serialize(serializable));
        sendMessageEntity.setDeliverAt(new Date());
        return sendMessageEntity;
    }

    @Override // org.onetwo.boot.mq.interceptor.DatabaseTransactionMessageInterceptor
    public void afterCommit(DatabaseTransactionMessageInterceptor.SendMessageEvent sendMessageEvent) {
        if (this.useAsync) {
            this.asyncTaskDelegateService.run(() -> {
                doAfterCommit(sendMessageEvent);
            });
        } else {
            doAfterCommit(sendMessageEvent);
        }
    }

    protected void doAfterCommit(DatabaseTransactionMessageInterceptor.SendMessageEvent sendMessageEvent) {
        boolean isDebug = sendMessageEvent.getSendMessageContext().isDebug();
        sendMessageEvent.getSendMessageContext().getChain().invoke();
        getSendMessageRepository().updateToSent(sendMessageEvent.getSendMessageContext());
        Logger logger = getLogger();
        if (isDebug && logger.isInfoEnabled()) {
            logger.info("committed transactional message in thread[{}]...", Long.valueOf(Thread.currentThread().getId()));
        }
    }

    @Override // org.onetwo.boot.mq.interceptor.DatabaseTransactionMessageInterceptor
    public void afterRollback(DatabaseTransactionMessageInterceptor.SendMessageEvent sendMessageEvent) {
        if (this.useAsync) {
            this.asyncTaskDelegateService.run(() -> {
                doAfterRollback(sendMessageEvent);
            });
        } else {
            doAfterRollback(sendMessageEvent);
        }
    }

    public void doAfterRollback(DatabaseTransactionMessageInterceptor.SendMessageEvent sendMessageEvent) {
        boolean isDebug = sendMessageEvent.getSendMessageContext().isDebug();
        getSendMessageRepository().remove(Arrays.asList(sendMessageEvent.getSendMessageContext()));
        Logger logger = getLogger();
        if (isDebug && logger.isInfoEnabled()) {
            logger.info("rollback transactional message in thread[{}]...", Long.valueOf(Thread.currentThread().getId()));
        }
    }

    public MessageBodyStoreSerializer getMessageBodyStoreSerializer() {
        return this.messageBodyStoreSerializer;
    }

    public SendMessageRepository getSendMessageRepository() {
        return this.sendMessageRepository;
    }

    public void setSendMessageRepository(SendMessageRepository sendMessageRepository) {
        this.sendMessageRepository = sendMessageRepository;
    }

    public ApplicationEventPublisher getApplicationEventPublisher() {
        return this.applicationEventPublisher;
    }
}
