package org.joyqueue.broker.protocol.handler;

import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.consumer.Consume;
import org.joyqueue.broker.consumer.model.PullResult;
import org.joyqueue.broker.helper.SessionHelper;
import org.joyqueue.broker.monitor.SessionManager;
import org.joyqueue.broker.network.traffic.Traffic;
import org.joyqueue.broker.polling.LongPolling;
import org.joyqueue.broker.polling.LongPollingManager;
import org.joyqueue.broker.protocol.JoyQueueCommandHandler;
import org.joyqueue.broker.protocol.JoyQueueContext;
import org.joyqueue.broker.protocol.JoyQueueContextAware;
import org.joyqueue.broker.protocol.command.FetchTopicMessageRequest;
import org.joyqueue.broker.protocol.command.FetchTopicMessageResponse;
import org.joyqueue.broker.protocol.converter.CheckResultConverter;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.network.command.BooleanAck;
import org.joyqueue.network.command.FetchTopicMessageAckData;
import org.joyqueue.network.command.FetchTopicMessageData;
import org.joyqueue.network.command.JoyQueueCommandType;
import org.joyqueue.network.protocol.annotation.FetchHandler;
import org.joyqueue.network.session.Connection;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.Type;
import org.joyqueue.response.BooleanResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@FetchHandler
/* loaded from: input_file:org/joyqueue/broker/protocol/handler/FetchTopicMessageRequestHandler.class */
public class FetchTopicMessageRequestHandler implements JoyQueueCommandHandler, Type, JoyQueueContextAware {
    protected static final Logger logger = LoggerFactory.getLogger(FetchTopicMessageRequestHandler.class);
    private Consume consume;
    private SessionManager sessionManager;
    private ClusterManager clusterManager;
    private LongPollingManager longPollingManager;

    @Override // org.joyqueue.broker.protocol.JoyQueueContextAware
    public void setJoyQueueContext(JoyQueueContext joyQueueContext) {
        this.consume = joyQueueContext.getBrokerContext().getConsume();
        this.sessionManager = joyQueueContext.getBrokerContext().getSessionManager();
        this.clusterManager = joyQueueContext.getBrokerContext().getClusterManager();
        this.longPollingManager = joyQueueContext.getLongPollingManager();
    }

    public Command handle(Transport transport, Command command) {
        FetchTopicMessageRequest fetchTopicMessageRequest = (FetchTopicMessageRequest) command.getPayload();
        Connection connection = SessionHelper.getConnection(transport);
        if (connection == null || !connection.isAuthorized(fetchTopicMessageRequest.getApp())) {
            logger.warn("connection does not exist, transport: {}, app: {}, topics: {}", new Object[]{transport, fetchTopicMessageRequest.getApp(), fetchTopicMessageRequest.getTopics().keySet()});
            return BooleanAck.build(JoyQueueCode.FW_CONNECTION_NOT_EXISTS.getCode());
        }
        boolean z = fetchTopicMessageRequest.getTopics().size() == 1 && fetchTopicMessageRequest.getLongPollTimeout() > 0;
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(fetchTopicMessageRequest.getTopics().size());
        Traffic traffic = new Traffic(fetchTopicMessageRequest.getApp());
        for (Map.Entry entry : fetchTopicMessageRequest.getTopics().entrySet()) {
            String str = (String) entry.getKey();
            BooleanResponse checkReadable = this.clusterManager.checkReadable(TopicName.parse(str), fetchTopicMessageRequest.getApp(), connection.getHost());
            if (!checkReadable.isSuccess()) {
                logger.warn("checkReadable failed, transport: {}, topic: {}, app: {}, code: {}", new Object[]{transport, str, fetchTopicMessageRequest.getApp(), checkReadable.getJoyQueueCode()});
                newHashMapWithExpectedSize.put(str, new FetchTopicMessageAckData(CheckResultConverter.convertFetchCode(command.getHeader().getVersion(), checkReadable.getJoyQueueCode())));
            } else if (fetchTopicMessageRequest.getTraffic().isLimited(str)) {
                newHashMapWithExpectedSize.put(str, new FetchTopicMessageAckData(JoyQueueCode.SUCCESS));
            } else {
                String consumer = connection.getConsumer(str, fetchTopicMessageRequest.getApp());
                Consumer consumerById = StringUtils.isBlank(consumer) ? null : this.sessionManager.getConsumerById(consumer);
                if (consumerById == null) {
                    logger.warn("connection does not exist, transport: {}, app: {}, topics: {}", new Object[]{transport, fetchTopicMessageRequest.getApp(), fetchTopicMessageRequest.getTopics().keySet()});
                    newHashMapWithExpectedSize.put(str, new FetchTopicMessageAckData(CheckResultConverter.convertFetchCode(command.getHeader().getVersion(), JoyQueueCode.FW_CONSUMER_NOT_EXISTS)));
                } else {
                    FetchTopicMessageData fetchTopicMessageData = (FetchTopicMessageData) entry.getValue();
                    FetchTopicMessageAckData fetchMessage = fetchMessage(transport, consumerById, fetchTopicMessageData.getCount(), fetchTopicMessageRequest.getAckTimeout());
                    if (z && CollectionUtils.isEmpty(fetchMessage.getBuffers()) && this.clusterManager.isNeedLongPull(consumerById.getTopic()) && this.longPollingManager.suspend(new LongPolling(consumerById, fetchTopicMessageData.getCount(), fetchTopicMessageRequest.getAckTimeout(), fetchTopicMessageRequest.getLongPollTimeout(), new FetchTopicMessageLongPollCallback(fetchTopicMessageRequest, command, transport)))) {
                        return null;
                    }
                    traffic.record(str, fetchMessage.getTraffic(), fetchMessage.getSize());
                    newHashMapWithExpectedSize.put(str, fetchMessage);
                }
            }
        }
        FetchTopicMessageResponse fetchTopicMessageResponse = new FetchTopicMessageResponse();
        fetchTopicMessageResponse.setTraffic(traffic);
        fetchTopicMessageResponse.setData(newHashMapWithExpectedSize);
        return new Command(fetchTopicMessageResponse);
    }

    protected FetchTopicMessageAckData fetchMessage(Transport transport, Consumer consumer, int i, int i2) {
        FetchTopicMessageAckData fetchTopicMessageAckData = new FetchTopicMessageAckData();
        fetchTopicMessageAckData.setBuffers(Collections.emptyList());
        try {
            PullResult message = this.consume.getMessage(consumer, i, i2);
            if (!message.getCode().equals(JoyQueueCode.SUCCESS)) {
                logger.error("fetchTopicMessage exception, transport: {}, consumer: {}, count: {}", new Object[]{transport, consumer, Integer.valueOf(i)});
            }
            fetchTopicMessageAckData.setBuffers(message.getBuffers());
            fetchTopicMessageAckData.setCode(message.getCode());
        } catch (Exception e) {
            logger.error("fetchTopicMessage exception, transport: {}, consumer: {}, count: {}", new Object[]{transport, consumer, Integer.valueOf(i), e});
            fetchTopicMessageAckData.setCode(JoyQueueCode.CN_UNKNOWN_ERROR);
        } catch (JoyQueueException e2) {
            logger.error("fetchTopicMessage exception, transport: {}, consumer: {}, count: {}", new Object[]{transport, consumer, Integer.valueOf(i), e2});
            fetchTopicMessageAckData.setCode(JoyQueueCode.valueOf(e2.getCode()));
        }
        return fetchTopicMessageAckData;
    }

    public int type() {
        return JoyQueueCommandType.FETCH_TOPIC_MESSAGE_REQUEST.getCode();
    }
}
