package org.joyqueue.broker.protocol.handler;

import com.google.common.collect.HashBasedTable;
import java.util.List;
import java.util.Map;
import org.joyqueue.broker.BrokerContext;
import org.joyqueue.broker.BrokerContextAware;
import org.joyqueue.broker.consumer.Consume;
import org.joyqueue.broker.helper.SessionHelper;
import org.joyqueue.broker.protocol.JoyQueueCommandHandler;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.network.command.BooleanAck;
import org.joyqueue.network.command.FetchIndexData;
import org.joyqueue.network.command.FetchIndexRequest;
import org.joyqueue.network.command.FetchIndexResponse;
import org.joyqueue.network.command.JoyQueueCommandType;
import org.joyqueue.network.session.Connection;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/protocol/handler/FetchIndexRequestHandler.class */
public class FetchIndexRequestHandler implements JoyQueueCommandHandler, Type, BrokerContextAware {
    protected static final Logger logger = LoggerFactory.getLogger(FetchIndexRequestHandler.class);
    private Consume consume;

    public void setBrokerContext(BrokerContext brokerContext) {
        this.consume = brokerContext.getConsume();
    }

    public Command handle(Transport transport, Command command) {
        FetchIndexRequest fetchIndexRequest = (FetchIndexRequest) command.getPayload();
        Connection connection = SessionHelper.getConnection(transport);
        if (connection == null || !connection.isAuthorized(fetchIndexRequest.getApp())) {
            logger.warn("connection is not exists, transport: {}, app: {}", transport, fetchIndexRequest.getApp());
            return BooleanAck.build(JoyQueueCode.FW_CONNECTION_NOT_EXISTS.getCode());
        }
        HashBasedTable create = HashBasedTable.create();
        for (Map.Entry entry : fetchIndexRequest.getPartitions().entrySet()) {
            String str = (String) entry.getKey();
            Consumer consumer = new Consumer(connection.getId(), str, fetchIndexRequest.getApp(), Consumer.ConsumeType.JOYQUEUE);
            for (Short sh : (List) entry.getValue()) {
                create.put(str, sh, fetchIndex(connection, consumer, sh.shortValue()));
            }
        }
        FetchIndexResponse fetchIndexResponse = new FetchIndexResponse();
        fetchIndexResponse.setData(create);
        return new Command(fetchIndexResponse);
    }

    protected FetchIndexData fetchIndex(Connection connection, Consumer consumer, short s) {
        FetchIndexData fetchIndexData = new FetchIndexData();
        try {
            long minIndex = this.consume.getMinIndex(consumer, s);
            long maxIndex = this.consume.getMaxIndex(consumer, s);
            fetchIndexData.setIndex(this.consume.getAckIndex(consumer, s));
            fetchIndexData.setLeftIndex(minIndex);
            fetchIndexData.setRightIndex(maxIndex);
            fetchIndexData.setCode(JoyQueueCode.SUCCESS);
        } catch (Exception e) {
            fetchIndexData.setCode(JoyQueueCode.CN_UNKNOWN_ERROR);
            logger.error("fetchIndex exception, consumer: {}, partition: {}, transport: {}", new Object[]{consumer, Short.valueOf(s), connection.getTransport(), e});
        }
        return fetchIndexData;
    }

    public int type() {
        return JoyQueueCommandType.FETCH_INDEX_REQUEST.getCode();
    }
}
