package org.openhubframework.openhub.component.funnel;

import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
import org.openhubframework.openhub.api.entity.Message;
import org.openhubframework.openhub.common.time.Seconds;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/openhubframework/openhub/component/funnel/MsgFunnelProducer.class */
public class MsgFunnelProducer extends DefaultProducer {
    private static final String FUNNEL_COMP_PREFIX = "funnel_";
    private static final Logger LOG = LoggerFactory.getLogger(MsgFunnelProducer.class);

    public MsgFunnelProducer(MsgFunnelEndpoint msgFunnelEndpoint) {
        super(msgFunnelEndpoint);
    }

    public void process(Exchange exchange) throws Exception {
        Message message = (Message) exchange.getIn().getHeader("processingMessage", Message.class);
        Assert.notNull(message, "message must be defined, msg-funnel component is for asynchronous messages only");
        if (StringUtils.isEmpty(message.getFunnelValue())) {
            LOG.debug("Message " + message.toHumanString() + " doesn't have funnel value => won't be filtered");
            return;
        }
        MsgFunnelEndpoint msgFunnelEndpoint = (MsgFunnelEndpoint) getEndpoint();
        String funnelCompId = getFunnelCompId(exchange, msgFunnelEndpoint);
        if (!funnelCompId.equals(message.getFunnelComponentId())) {
            msgFunnelEndpoint.getMessageService().setFunnelComponentId(message, funnelCompId);
        }
        if (!msgFunnelEndpoint.isGuaranteedOrder()) {
            if (msgFunnelEndpoint.getMessageService().getCountProcessingMessagesForFunnel(message.getFunnelValue(), Seconds.of(msgFunnelEndpoint.getIdleInterval()).toDuration(), funnelCompId) <= 1) {
                LOG.debug("There is only one processing message with funnel value: " + message.getFunnelValue() + " => no filtering");
                return;
            } else {
                LOG.debug("There are more processing messages with funnel value '" + message.getFunnelValue() + "', message " + message.toHumanString() + " will be postponed.");
                postponeMessage(exchange, message, msgFunnelEndpoint);
                return;
            }
        }
        List messagesForGuaranteedOrderForFunnel = msgFunnelEndpoint.getMessageService().getMessagesForGuaranteedOrderForFunnel(message.getFunnelValue(), Seconds.of(msgFunnelEndpoint.getIdleInterval()).toDuration(), msgFunnelEndpoint.isExcludeFailedState(), funnelCompId);
        if (messagesForGuaranteedOrderForFunnel.size() == 1) {
            LOG.debug("There is only one processing message with funnel value: " + message.getFunnelValue() + " => no filtering");
        } else if (((Message) messagesForGuaranteedOrderForFunnel.get(0)).equals(message)) {
            LOG.debug("Processing message (msg_id = {}, funnel value = '{}') is the first one => no filtering", message.getMsgId(), message.getFunnelValue());
        } else {
            LOG.debug("There is at least one processing message with funnel value '{}' before current message (msg_id = {}); message {} will be postponed.", new Object[]{message.getFunnelValue(), message.getMsgId(), message.toHumanString()});
            postponeMessage(exchange, message, msgFunnelEndpoint);
        }
    }

    private String getFunnelCompId(Exchange exchange, MsgFunnelEndpoint msgFunnelEndpoint) {
        return msgFunnelEndpoint.getId() != null ? msgFunnelEndpoint.getId() : FUNNEL_COMP_PREFIX + exchange.getFromRouteId();
    }

    private void postponeMessage(Exchange exchange, Message message, MsgFunnelEndpoint msgFunnelEndpoint) {
        msgFunnelEndpoint.getMessageService().setStatePostponed(message);
        msgFunnelEndpoint.getAsyncEventNotifier().notifyMsgPostponed(exchange);
        exchange.setProperty("CamelRouteStop", Boolean.TRUE);
    }
}
