package org.joyqueue.broker.protocol.handler;

import com.google.common.collect.HashBasedTable;
import java.util.Map;
import org.joyqueue.broker.BrokerContext;
import org.joyqueue.broker.BrokerContextAware;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.consumer.Consume;
import org.joyqueue.broker.helper.SessionHelper;
import org.joyqueue.broker.protocol.JoyQueueCommandHandler;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.network.command.BooleanAck;
import org.joyqueue.network.command.CommitIndexRequest;
import org.joyqueue.network.command.CommitIndexResponse;
import org.joyqueue.network.command.JoyQueueCommandType;
import org.joyqueue.network.session.Connection;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.Type;
import org.joyqueue.response.BooleanResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/protocol/handler/CommitIndexRequestHandler.class */
public class CommitIndexRequestHandler implements JoyQueueCommandHandler, Type, BrokerContextAware {
    protected static final Logger logger = LoggerFactory.getLogger(CommitIndexRequestHandler.class);
    private ClusterManager clusterManager;
    private Consume consume;

    public void setBrokerContext(BrokerContext brokerContext) {
        this.clusterManager = brokerContext.getClusterManager();
        this.consume = brokerContext.getConsume();
    }

    public Command handle(Transport transport, Command command) {
        CommitIndexRequest commitIndexRequest = (CommitIndexRequest) command.getPayload();
        Connection connection = SessionHelper.getConnection(transport);
        String app = commitIndexRequest.getApp();
        if (connection == null || !connection.isAuthorized(app)) {
            logger.warn("connection does not exist, transport: {}, app: {}", transport, app);
            return BooleanAck.build(JoyQueueCode.FW_CONNECTION_NOT_EXISTS.getCode());
        }
        HashBasedTable create = HashBasedTable.create();
        for (Map.Entry entry : commitIndexRequest.getData().rowMap().entrySet()) {
            String str = (String) entry.getKey();
            for (Map.Entry entry2 : ((Map) entry.getValue()).entrySet()) {
                short shortValue = ((Short) entry2.getKey()).shortValue();
                long longValue = ((Long) entry2.getValue()).longValue();
                BooleanResponse checkReadable = this.clusterManager.checkReadable(TopicName.parse(str), app, connection.getHost(), shortValue);
                if (checkReadable.isSuccess()) {
                    try {
                        Consumer consumer = new Consumer(str, app);
                        if (longValue == -1) {
                            longValue = this.consume.getMaxIndex(consumer, shortValue);
                        } else if (longValue == -2) {
                            longValue = this.consume.getMinIndex(consumer, shortValue);
                        }
                        this.consume.setAckIndex(consumer, shortValue, longValue);
                    } catch (JoyQueueException e) {
                        logger.error("commit index exception, topic: {}, app: {}, partition: {}, transport: {}, code: {}", new Object[]{str, app, Short.valueOf(shortValue), connection, JoyQueueCode.valueOf(e.getCode())});
                        create.put(str, Short.valueOf(shortValue), JoyQueueCode.valueOf(e.getCode()));
                    } catch (Exception e2) {
                        logger.error("commit index exception, topic: {}, app: {}, partition: {}, transport: {}, code: {}", new Object[]{str, app, Short.valueOf(shortValue), connection, e2});
                        create.put(str, Short.valueOf(shortValue), JoyQueueCode.CN_UNKNOWN_ERROR);
                    }
                } else {
                    logger.warn("check commit index error, topic: {}, app: {}, partition: {}, transport: {}, code: {}", new Object[]{str, app, Short.valueOf(shortValue), connection, checkReadable.getJoyQueueCode()});
                    create.put(str, Short.valueOf(shortValue), checkReadable.getJoyQueueCode());
                }
            }
        }
        CommitIndexResponse commitIndexResponse = new CommitIndexResponse();
        commitIndexResponse.setResult(create);
        return new Command(commitIndexResponse);
    }

    public int type() {
        return JoyQueueCommandType.COMMIT_ACK_INDEX_REQUEST.getCode();
    }
}
