package org.joyqueue.broker.protocol.handler;

import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import java.util.Collections;
import java.util.Map;
import org.joyqueue.broker.BrokerContext;
import org.joyqueue.broker.BrokerContextAware;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.consumer.Consume;
import org.joyqueue.broker.consumer.model.PullResult;
import org.joyqueue.broker.helper.SessionHelper;
import org.joyqueue.broker.network.traffic.Traffic;
import org.joyqueue.broker.protocol.JoyQueueCommandHandler;
import org.joyqueue.broker.protocol.command.FetchPartitionMessageRequest;
import org.joyqueue.broker.protocol.command.FetchPartitionMessageResponse;
import org.joyqueue.broker.protocol.converter.CheckResultConverter;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.network.command.BooleanAck;
import org.joyqueue.network.command.FetchPartitionMessageAckData;
import org.joyqueue.network.command.FetchPartitionMessageData;
import org.joyqueue.network.command.JoyQueueCommandType;
import org.joyqueue.network.protocol.annotation.FetchHandler;
import org.joyqueue.network.session.Connection;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.Type;
import org.joyqueue.response.BooleanResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@FetchHandler
/* loaded from: input_file:org/joyqueue/broker/protocol/handler/FetchPartitionMessageRequestHandler.class */
public class FetchPartitionMessageRequestHandler implements JoyQueueCommandHandler, Type, BrokerContextAware {
    protected static final Logger logger = LoggerFactory.getLogger(FetchPartitionMessageRequestHandler.class);
    private Consume consume;
    private ClusterManager clusterManager;

    public void setBrokerContext(BrokerContext brokerContext) {
        this.consume = brokerContext.getConsume();
        this.clusterManager = brokerContext.getClusterManager();
    }

    public Command handle(Transport transport, Command command) {
        FetchPartitionMessageRequest fetchPartitionMessageRequest = (FetchPartitionMessageRequest) command.getPayload();
        Connection connection = SessionHelper.getConnection(transport);
        if (connection == null || !connection.isAuthorized(fetchPartitionMessageRequest.getApp())) {
            logger.warn("connection is not exists, transport: {}, app: {}", transport, fetchPartitionMessageRequest.getApp());
            return BooleanAck.build(JoyQueueCode.FW_CONNECTION_NOT_EXISTS.getCode());
        }
        Table create = HashBasedTable.create();
        Traffic traffic = new Traffic(fetchPartitionMessageRequest.getApp());
        for (Map.Entry entry : fetchPartitionMessageRequest.getPartitions().rowMap().entrySet()) {
            String str = (String) entry.getKey();
            Consumer consumer = new Consumer(connection.getId(), str, fetchPartitionMessageRequest.getApp(), Consumer.ConsumeType.JOYQUEUE);
            for (Map.Entry entry2 : ((Map) entry.getValue()).entrySet()) {
                short shortValue = ((Short) entry2.getKey()).shortValue();
                BooleanResponse checkReadable = this.clusterManager.checkReadable(TopicName.parse(str), fetchPartitionMessageRequest.getApp(), connection.getHost(), shortValue);
                if (!checkReadable.isSuccess()) {
                    logger.warn("checkReadable failed, transport: {}, topic: {}, partition: {}, app: {}, code: {}", new Object[]{transport, consumer.getTopic(), Short.valueOf(shortValue), consumer.getApp(), checkReadable.getJoyQueueCode()});
                    create.put(str, entry2.getKey(), new FetchPartitionMessageAckData(CheckResultConverter.convertFetchCode(command.getHeader().getVersion(), checkReadable.getJoyQueueCode())));
                } else if (fetchPartitionMessageRequest.getTraffic().isLimited((String) entry.getKey())) {
                    create.put(str, entry2.getKey(), new FetchPartitionMessageAckData(JoyQueueCode.SUCCESS));
                } else {
                    FetchPartitionMessageData fetchPartitionMessageData = (FetchPartitionMessageData) entry2.getValue();
                    FetchPartitionMessageAckData fetchMessage = fetchMessage(transport, consumer, shortValue, fetchPartitionMessageData.getIndex(), fetchPartitionMessageData.getCount());
                    create.put(str, entry2.getKey(), fetchMessage);
                    traffic.record(str, fetchMessage.getTraffic(), fetchMessage.getSize());
                }
            }
        }
        FetchPartitionMessageResponse fetchPartitionMessageResponse = new FetchPartitionMessageResponse();
        fetchPartitionMessageResponse.setTraffic(traffic);
        fetchPartitionMessageResponse.setData(create);
        return new Command(fetchPartitionMessageResponse);
    }

    protected FetchPartitionMessageAckData fetchMessage(Transport transport, Consumer consumer, short s, long j, int i) {
        FetchPartitionMessageAckData fetchPartitionMessageAckData = new FetchPartitionMessageAckData();
        fetchPartitionMessageAckData.setBuffers(Collections.emptyList());
        try {
            long minIndex = this.consume.getMinIndex(consumer, s);
            long maxIndex = this.consume.getMaxIndex(consumer, s);
            if (j == -1) {
                j = this.consume.getAckIndex(consumer, s);
                if (j < minIndex) {
                    logger.warn("fetchPartitionMessage exception, index reset to minIndex, transport: {}, consumer: {}, partition: {}, index: {}, minIndex: {}, maxIndex: {}", new Object[]{transport, consumer, Short.valueOf(s), Long.valueOf(j), Long.valueOf(minIndex), Long.valueOf(maxIndex)});
                    j = minIndex;
                }
            }
            if (j < minIndex || j > maxIndex) {
                logger.warn("fetchPartitionMessage exception, index ou of range, transport: {}, consumer: {}, partition: {}, index: {}, minIndex: {}, maxIndex: {}", new Object[]{transport, consumer, Short.valueOf(s), Long.valueOf(j), Long.valueOf(minIndex), Long.valueOf(maxIndex)});
                fetchPartitionMessageAckData.setCode(JoyQueueCode.FW_FETCH_MESSAGE_INDEX_OUT_OF_RANGE);
            } else {
                PullResult message = this.consume.getMessage(consumer, s, j, i);
                if (!message.getCode().equals(JoyQueueCode.SUCCESS)) {
                    logger.error("fetchPartitionMessage exception, transport: {}, consumer: {}, partition: {}, index: {}, minIndex: {}, maxIndex: {}", new Object[]{transport, consumer, Short.valueOf(s), Long.valueOf(j), Long.valueOf(minIndex), Long.valueOf(maxIndex)});
                }
                fetchPartitionMessageAckData.setBuffers(message.getBuffers());
                fetchPartitionMessageAckData.setCode(message.getCode());
            }
        } catch (JoyQueueException e) {
            logger.error("fetchPartitionMessage exception, transport: {}, consumer: {}, partition: {}, index: {}", new Object[]{transport, consumer, Short.valueOf(s), Long.valueOf(j), e});
            fetchPartitionMessageAckData.setCode(JoyQueueCode.valueOf(e.getCode()));
        } catch (Exception e2) {
            logger.error("fetchPartitionMessage exception, transport: {}, consumer: {}, partition: {}, index: {}", new Object[]{transport, consumer, Short.valueOf(s), Long.valueOf(j), e2});
            fetchPartitionMessageAckData.setCode(JoyQueueCode.CN_UNKNOWN_ERROR);
        }
        return fetchPartitionMessageAckData;
    }

    public int type() {
        return JoyQueueCommandType.FETCH_PARTITION_MESSAGE_REQUEST.getCode();
    }
}
