package org.joyqueue.broker.protocol.handler;

import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.broker.BrokerContext;
import org.joyqueue.broker.BrokerContextAware;
import org.joyqueue.broker.buffer.Serializer;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.consumer.Consume;
import org.joyqueue.broker.helper.SessionHelper;
import org.joyqueue.broker.protocol.JoyQueueCommandHandler;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.message.MessageLocation;
import org.joyqueue.network.command.BooleanAck;
import org.joyqueue.network.command.CommitAckData;
import org.joyqueue.network.command.CommitAckRequest;
import org.joyqueue.network.command.CommitAckResponse;
import org.joyqueue.network.command.JoyQueueCommandType;
import org.joyqueue.network.command.RetryType;
import org.joyqueue.network.session.Connection;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.Type;
import org.joyqueue.server.retry.api.MessageRetry;
import org.joyqueue.server.retry.model.RetryMessageModel;
import org.joyqueue.toolkit.lang.ListUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/protocol/handler/CommitAckRequestHandler.class */
public class CommitAckRequestHandler implements JoyQueueCommandHandler, Type, BrokerContextAware {
    protected static final Logger logger = LoggerFactory.getLogger(CommitAckRequestHandler.class);
    private Consume consume;
    private MessageRetry retryManager;
    private ClusterManager clusterManager;

    public void setBrokerContext(BrokerContext brokerContext) {
        this.consume = brokerContext.getConsume();
        this.retryManager = brokerContext.getRetryManager();
        this.clusterManager = brokerContext.getClusterManager();
    }

    public Command handle(Transport transport, Command command) {
        CommitAckRequest commitAckRequest = (CommitAckRequest) command.getPayload();
        Connection connection = SessionHelper.getConnection(transport);
        if (connection == null || !connection.isAuthorized(commitAckRequest.getApp())) {
            logger.warn("connection is not exists, transport: {}, app: {}", transport, commitAckRequest.getApp());
            return BooleanAck.build(JoyQueueCode.FW_CONNECTION_NOT_EXISTS.getCode());
        }
        HashBasedTable create = HashBasedTable.create();
        for (Map.Entry entry : commitAckRequest.getData().rowMap().entrySet()) {
            String str = (String) entry.getKey();
            for (Map.Entry entry2 : ((Map) entry.getValue()).entrySet()) {
                create.put(str, entry2.getKey(), commitAck(connection, str, commitAckRequest.getApp(), ((Short) entry2.getKey()).shortValue(), (List) entry2.getValue()));
            }
        }
        CommitAckResponse commitAckResponse = new CommitAckResponse();
        commitAckResponse.setResult(create);
        return new Command(commitAckResponse);
    }

    protected JoyQueueCode commitAck(Connection connection, String str, String str2, short s, List<CommitAckData> list) {
        return s == Short.MAX_VALUE ? doCommitRetry(connection, str, str2, s, list) : doCommitAck(connection, str, str2, s, list);
    }

    protected JoyQueueCode doCommitRetry(Connection connection, String str, String str2, short s, List<CommitAckData> list) {
        try {
            LinkedList newLinkedList = Lists.newLinkedList();
            LinkedList newLinkedList2 = Lists.newLinkedList();
            for (CommitAckData commitAckData : list) {
                if (commitAckData.getRetryType().equals(RetryType.NONE)) {
                    newLinkedList.add(Long.valueOf(commitAckData.getIndex()));
                } else {
                    newLinkedList2.add(Long.valueOf(commitAckData.getIndex()));
                }
            }
            if (CollectionUtils.isNotEmpty(newLinkedList)) {
                this.retryManager.retrySuccess(str, str2, ListUtil.toArray(newLinkedList));
            }
            if (CollectionUtils.isNotEmpty(newLinkedList2)) {
                this.retryManager.retryError(str, str2, ListUtil.toArray(newLinkedList2));
            }
            return JoyQueueCode.SUCCESS;
        } catch (JoyQueueException e) {
            logger.error("commit ack exception, topic: {}, app: {}, partition: {}, transport: {}", new Object[]{str, str2, Short.valueOf(s), connection.getTransport(), e});
            return JoyQueueCode.valueOf(e.getCode());
        } catch (Exception e2) {
            logger.error("commit ack exception, topic: {}, app: {}, partition: {}, transport: {}", new Object[]{str, str2, Short.valueOf(s), connection.getTransport(), e2});
            return JoyQueueCode.CN_UNKNOWN_ERROR;
        }
    }

    protected JoyQueueCode doCommitAck(Connection connection, String str, String str2, short s, List<CommitAckData> list) {
        MessageLocation[] messageLocationArr = new MessageLocation[list.size()];
        LinkedList linkedList = null;
        Consumer consumer = new Consumer(connection.getId(), str, str2, Consumer.ConsumeType.JOYQUEUE);
        for (int i = 0; i < list.size(); i++) {
            CommitAckData commitAckData = list.get(i);
            messageLocationArr[i] = new MessageLocation(str, s, commitAckData.getIndex());
            if (!commitAckData.getRetryType().equals(RetryType.NONE)) {
                if (linkedList == null) {
                    linkedList = Lists.newLinkedList();
                }
                linkedList.add(commitAckData);
            }
        }
        try {
            if (CollectionUtils.isNotEmpty(linkedList)) {
                try {
                    commitRetry(connection, consumer, linkedList);
                } catch (Exception e) {
                    logger.error("commit retry exception, topic: {}, app: {}, partition: {}, transport: {}", new Object[]{str, str2, Short.valueOf(s), connection.getTransport(), e});
                    this.consume.releasePartition(str, str2, s);
                    return JoyQueueCode.CN_UNKNOWN_ERROR;
                } catch (JoyQueueException e2) {
                    logger.error("commit retry exception, topic: {}, app: {}, partition: {}, transport: {}", new Object[]{str, str2, Short.valueOf(s), connection.getTransport(), e2});
                    this.consume.releasePartition(str, str2, s);
                    return JoyQueueCode.valueOf(e2.getCode());
                }
            }
            this.consume.acknowledge(messageLocationArr, consumer, connection, true);
            return JoyQueueCode.SUCCESS;
        } catch (JoyQueueException e3) {
            logger.error("commit ack exception, topic: {}, app: {}, partition: {}, transport: {}", new Object[]{str, str2, Short.valueOf(s), connection.getTransport(), e3});
            return JoyQueueCode.valueOf(e3.getCode());
        } catch (Exception e4) {
            logger.error("commit ack exception, topic: {}, app: {}, partition: {}, transport: {}", new Object[]{str, str2, Short.valueOf(s), connection.getTransport(), e4});
            return JoyQueueCode.CN_UNKNOWN_ERROR;
        }
    }

    protected void commitRetry(Connection connection, Consumer consumer, List<CommitAckData> list) throws Exception {
        LinkedList newLinkedList = Lists.newLinkedList();
        for (CommitAckData commitAckData : list) {
            List buffers = this.consume.getMessage(consumer, commitAckData.getPartition(), commitAckData.getIndex(), 1).getBuffers();
            if (buffers.size() != 1) {
                logger.error("get retryMessage error, message not exist, transport: {}, topic: {}, app: {}, partition: {}, index: {}", new Object[]{connection.getTransport().remoteAddress(), consumer.getTopic(), consumer.getApp(), Short.valueOf(commitAckData.getPartition()), Long.valueOf(commitAckData.getIndex())});
            } else {
                ByteBuffer byteBuffer = (ByteBuffer) buffers.get(0);
                newLinkedList.add(generateRetryMessage(consumer, Serializer.readBrokerMessage(byteBuffer), byteBuffer.array(), commitAckData.getRetryType().name()));
            }
        }
        this.retryManager.addRetry(newLinkedList);
    }

    private RetryMessageModel generateRetryMessage(Consumer consumer, BrokerMessage brokerMessage, byte[] bArr, String str) {
        RetryMessageModel retryMessageModel = new RetryMessageModel();
        retryMessageModel.setBusinessId(brokerMessage.getBusinessId());
        retryMessageModel.setTopic(consumer.getTopic());
        retryMessageModel.setApp(consumer.getApp());
        retryMessageModel.setPartition(Short.MAX_VALUE);
        retryMessageModel.setIndex(brokerMessage.getMsgIndexNo());
        retryMessageModel.setBrokerMessage(bArr);
        retryMessageModel.setException(str.getBytes(Charset.forName("UTF-8")));
        retryMessageModel.setSendTime(brokerMessage.getStartTime());
        return retryMessageModel;
    }

    public int type() {
        return JoyQueueCommandType.COMMIT_ACK_REQUEST.getCode();
    }
}
