package vip.wuweijie.camel.component.rocketmq.reply;

import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import vip.wuweijie.camel.component.rocketmq.RocketMQEndpoint;
import vip.wuweijie.camel.component.rocketmq.RocketMQMessageConverter;
import vip.wuweijie.camel.component.rocketmq.RocketMQProducer;

/* loaded from: input_file:vip/wuweijie/camel/component/rocketmq/reply/RocketMQReplyManagerSupport.class */
public class RocketMQReplyManagerSupport extends ServiceSupport implements ReplyManager {
    private static final int CLOSE_TIMEOUT = 30000;
    protected final CamelContext camelContext;
    protected ScheduledExecutorService executorService;
    protected RocketMQEndpoint endpoint;
    protected String replyToTopic;
    protected DefaultMQPushConsumer mqPushConsumer;
    protected ReplyTimeoutMap timeoutMap;
    protected final Logger log = LoggerFactory.getLogger(RocketMQReplyManagerSupport.class);
    protected final CountDownLatch replyToLatch = new CountDownLatch(1);
    protected final long replyToTimeout = 1000;
    private final RocketMQMessageConverter messageConverter = new RocketMQMessageConverter();

    public RocketMQReplyManagerSupport(CamelContext camelContext) {
        this.camelContext = camelContext;
    }

    protected void doStart() throws Exception {
        ObjectHelper.notNull(this.executorService, "executorService", this);
        ObjectHelper.notNull(this.endpoint, "endpoint", this);
        this.log.debug("Using timeout checker interval with {} millis", this.endpoint.getRequestTimeoutCheckerInterval());
        this.timeoutMap = new ReplyTimeoutMap(this.executorService, this.endpoint.getRequestTimeoutCheckerInterval().longValue());
        ServiceHelper.startService(this.timeoutMap);
        this.mqPushConsumer = createConsumer();
        this.mqPushConsumer.start();
        this.log.debug("Using executor {}", this.executorService);
    }

    protected DefaultMQPushConsumer createConsumer() throws MQClientException {
        setReplyToTopic(this.endpoint.getReplyToTopic());
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
        defaultMQPushConsumer.setConsumerGroup(this.endpoint.getReplyToConsumerGroup());
        defaultMQPushConsumer.setNamesrvAddr(this.endpoint.getNamesrvAddr());
        defaultMQPushConsumer.subscribe(this.replyToTopic, "*");
        defaultMQPushConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
            MessageExt messageExt = (MessageExt) list.get(0);
            onMessage(messageExt);
            this.log.trace("Consume message {}", messageExt);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        return defaultMQPushConsumer;
    }

    public void onMessage(MessageExt messageExt) {
        String str = (String) Arrays.stream(messageExt.getKeys().split(" ")).filter(str2 -> {
            return str2.startsWith(RocketMQProducer.GENERATE_MESSAGE_KEY_PREFIX);
        }).findFirst().orElse(null);
        if (str == null) {
            this.log.warn("Ignoreing message with no messageKey: {}", messageExt);
        } else {
            this.log.debug("Received reply message with messageKey [{}] -> {}", str, messageExt);
            handleReplyMessage(str, messageExt);
        }
    }

    protected void doStop() throws Exception {
        ServiceHelper.stopService(this.timeoutMap);
        if (this.mqPushConsumer != null) {
            this.log.debug("Closing connection: {} with timeout: {} ms.", this.mqPushConsumer, Integer.valueOf(CLOSE_TIMEOUT));
            this.mqPushConsumer.shutdown();
            this.mqPushConsumer = null;
        }
        if (this.executorService != null) {
            this.camelContext.getExecutorServiceManager().shutdownGraceful(this.executorService);
            this.executorService = null;
        }
    }

    @Override // vip.wuweijie.camel.component.rocketmq.reply.ReplyManager
    public void setEndpoint(RocketMQEndpoint rocketMQEndpoint) {
        this.endpoint = rocketMQEndpoint;
    }

    @Override // vip.wuweijie.camel.component.rocketmq.reply.ReplyManager
    public String getReplyToTopic() {
        if (this.replyToTopic != null) {
            return this.replyToTopic;
        }
        try {
            this.log.trace("Waiting for replyToTopic to be set");
            if (this.replyToLatch.await(1000L, TimeUnit.MILLISECONDS)) {
                this.log.trace("Waiting for replyToTopic to be set done");
            } else {
                this.log.warn("ReplyToTopic was not set and timeout occurred");
            }
        } catch (InterruptedException e) {
        }
        return this.replyToTopic;
    }

    @Override // vip.wuweijie.camel.component.rocketmq.reply.ReplyManager
    public void setReplyToTopic(String str) {
        this.log.debug("ReplyToTopic: {}", str);
        this.replyToTopic = str;
        this.replyToLatch.countDown();
    }

    @Override // vip.wuweijie.camel.component.rocketmq.reply.ReplyManager
    public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback asyncCallback, String str, long j) {
        if (this.timeoutMap.putIfAbsent(str, (ReplyHandler) new RocketMQReplyHandler(replyManager, exchange, asyncCallback, str, j), j) != null) {
            throw new IllegalArgumentException(String.format("The messageKey [%s] is not unique.", str));
        }
        return str;
    }

    @Override // vip.wuweijie.camel.component.rocketmq.reply.ReplyManager
    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
        this.executorService = scheduledExecutorService;
    }

    @Override // vip.wuweijie.camel.component.rocketmq.reply.ReplyManager
    public void updateMessageKey(String str, String str2, long j) {
        this.log.trace("Updated messageKey [{}] to [{}]", str, str2);
        Optional.ofNullable(this.timeoutMap.remove(str)).ifPresent(replyHandler -> {
            this.timeoutMap.put(str2, replyHandler, j);
        });
    }

    @Override // vip.wuweijie.camel.component.rocketmq.reply.ReplyManager
    public void processReply(ReplyHolder replyHolder) {
        if (replyHolder == null || !isRunAllowed()) {
            return;
        }
        try {
            Exchange exchange = replyHolder.getExchange();
            if (replyHolder.isTimeout()) {
                if (this.log.isWarnEnabled()) {
                    this.log.warn("Timeout occurred after {} millis waiting for reply message with messageKey [{}] on topic {}. Setting ExchangeTimedOutException on {} and continue routing.", new Object[]{Long.valueOf(replyHolder.getTimeout()), replyHolder.getMessageKey(), this.replyToTopic, ExchangeHelper.logIds(exchange)});
                }
                exchange.setException(new ExchangeTimedOutException(exchange, replyHolder.getTimeout(), "reply message with messageKey: " + replyHolder.getMessageKey() + " not received on topic: " + this.replyToTopic));
            } else {
                this.messageConverter.populateRocketExchange(exchange, replyHolder.getMessageExt(), true);
            }
        } finally {
            replyHolder.getCallback().done(false);
        }
    }

    @Override // vip.wuweijie.camel.component.rocketmq.reply.ReplyManager
    public void cancelMessageKey(String str) {
        Optional.ofNullable(this.timeoutMap.get(str)).ifPresent(replyHandler -> {
            this.log.warn("Cancelling messageKey: {}", str);
            this.timeoutMap.remove(str);
        });
    }

    protected ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback asyncCallback, String str, long j) {
        return new RocketMQReplyHandler(replyManager, exchange, asyncCallback, str, j);
    }

    protected void handleReplyMessage(String str, MessageExt messageExt) {
        ReplyHandler replyHandler = (ReplyHandler) this.timeoutMap.get(str);
        if (replyHandler == null) {
            this.log.warn("Reply received for unknown messageKey [{}]. The message will be ignored: {}", str, messageExt);
        } else {
            this.timeoutMap.remove(str);
            replyHandler.onReply(str, messageExt);
        }
    }
}
