package org.joyqueue.broker.protocol.handler;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.helper.SessionHelper;
import org.joyqueue.broker.network.traffic.Traffic;
import org.joyqueue.broker.producer.Produce;
import org.joyqueue.broker.producer.ProduceConfig;
import org.joyqueue.broker.protocol.JoyQueueCommandHandler;
import org.joyqueue.broker.protocol.JoyQueueContext;
import org.joyqueue.broker.protocol.JoyQueueContextAware;
import org.joyqueue.broker.protocol.command.ProduceMessageResponse;
import org.joyqueue.broker.protocol.config.JoyQueueConfig;
import org.joyqueue.broker.protocol.converter.CheckResultConverter;
import org.joyqueue.domain.QosLevel;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.network.command.BooleanAck;
import org.joyqueue.network.command.JoyQueueCommandType;
import org.joyqueue.network.command.ProduceMessageAckData;
import org.joyqueue.network.command.ProduceMessageAckItemData;
import org.joyqueue.network.command.ProduceMessageData;
import org.joyqueue.network.command.ProduceMessageRequest;
import org.joyqueue.network.protocol.annotation.ProduceHandler;
import org.joyqueue.network.session.Connection;
import org.joyqueue.network.session.Producer;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.Type;
import org.joyqueue.response.BooleanResponse;
import org.joyqueue.store.WriteResult;
import org.joyqueue.toolkit.concurrent.EventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ProduceHandler
/* loaded from: input_file:org/joyqueue/broker/protocol/handler/ProduceMessageRequestHandler.class */
public class ProduceMessageRequestHandler implements JoyQueueCommandHandler, Type, JoyQueueContextAware {
    protected static final Logger logger = LoggerFactory.getLogger(ProduceMessageRequestHandler.class);
    private JoyQueueConfig config;
    private ProduceConfig produceConfig;
    private Produce produce;
    private ClusterManager clusterManager;

    @Override // org.joyqueue.broker.protocol.JoyQueueContextAware
    public void setJoyQueueContext(JoyQueueContext joyQueueContext) {
        this.config = JoyQueueContext.getConfig();
        this.produceConfig = new ProduceConfig(joyQueueContext.getBrokerContext().getPropertySupplier());
        this.produce = joyQueueContext.getBrokerContext().getProduce();
        this.clusterManager = joyQueueContext.getBrokerContext().getClusterManager();
    }

    public Command handle(Transport transport, Command command) {
        ProduceMessageRequest produceMessageRequest = (ProduceMessageRequest) command.getPayload();
        Connection connection = SessionHelper.getConnection(transport);
        if (connection == null || !connection.isAuthorized(produceMessageRequest.getApp())) {
            logger.warn("connection is not exists, transport: {}, app: {}", transport, produceMessageRequest.getApp());
            return BooleanAck.build(JoyQueueCode.FW_CONNECTION_NOT_EXISTS.getCode());
        }
        boolean z = !command.getHeader().getQosLevel().equals(QosLevel.ONE_WAY);
        CountDownLatch countDownLatch = new CountDownLatch(produceMessageRequest.getData().size());
        ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        Traffic traffic = new Traffic(produceMessageRequest.getApp());
        for (Map.Entry entry : produceMessageRequest.getData().entrySet()) {
            String str = (String) entry.getKey();
            ProduceMessageData produceMessageData = (ProduceMessageData) entry.getValue();
            try {
                checkAndFillMessage(connection, produceMessageData);
                BooleanResponse checkWritable = this.clusterManager.checkWritable(TopicName.parse(str), produceMessageRequest.getApp(), connection.getHost(), ((BrokerMessage) produceMessageData.getMessages().get(0)).getPartition());
                if (checkWritable.isSuccess()) {
                    produceMessage(connection, str, produceMessageRequest.getApp(), produceMessageData, produceMessageAckData -> {
                        newConcurrentMap.put(str, produceMessageAckData);
                        traffic.record(str, produceMessageData.getTraffic(), produceMessageData.getSize());
                        countDownLatch.countDown();
                    });
                } else {
                    logger.warn("checkWritable failed, transport: {}, topic: {}, app: {}, code: {}", new Object[]{transport, str, produceMessageRequest.getApp(), checkWritable.getJoyQueueCode()});
                    newConcurrentMap.put(str, buildResponse(produceMessageData, CheckResultConverter.convertProduceCode(command.getHeader().getVersion(), checkWritable.getJoyQueueCode())));
                    countDownLatch.countDown();
                }
            } catch (JoyQueueException e) {
                logger.warn("checkMessage error, transport: {}, topic: {}, app: {}", new Object[]{transport, str, produceMessageRequest.getApp(), e});
                newConcurrentMap.put(str, buildResponse(produceMessageData, JoyQueueCode.valueOf(e.getCode())));
                countDownLatch.countDown();
            }
        }
        if (!z) {
            return null;
        }
        try {
            if (!countDownLatch.await(this.config.getProduceMaxTimeout(), TimeUnit.MILLISECONDS)) {
                logger.warn("wait produce timeout, transport: {}, topics: {}", transport.remoteAddress(), produceMessageRequest.getData().keySet());
            }
        } catch (InterruptedException e2) {
            logger.error("wait produce exception, transport: {}", transport.remoteAddress(), e2);
        }
        ProduceMessageResponse produceMessageResponse = new ProduceMessageResponse();
        produceMessageResponse.setTraffic(traffic);
        produceMessageResponse.setData(newConcurrentMap);
        return new Command(produceMessageResponse);
    }

    protected void produceMessage(Connection connection, String str, String str2, ProduceMessageData produceMessageData, EventListener<ProduceMessageAckData> eventListener) {
        Producer producer = new Producer(connection.getId(), str, str2, Producer.ProducerType.JOYQUEUE);
        try {
            this.produce.putMessageAsync(producer, produceMessageData.getMessages(), produceMessageData.getQosLevel(), produceMessageData.getTimeout(), writeResult -> {
                if (!writeResult.getCode().equals(JoyQueueCode.SUCCESS)) {
                    logger.error("produce message failed, topic: {}, code: {}", producer.getTopic(), writeResult.getCode());
                }
                ProduceMessageAckData produceMessageAckData = new ProduceMessageAckData();
                produceMessageAckData.setCode(writeResult.getCode());
                produceMessageAckData.setItem(buildResponse(produceMessageData.getMessages(), writeResult));
                eventListener.onEvent(produceMessageAckData);
            });
        } catch (Exception e) {
            logger.error("produceMessage exception, transport: {}, topic: {}, app: {}", new Object[]{connection.getTransport().remoteAddress(), str, str2, e});
            eventListener.onEvent(buildResponse(produceMessageData, JoyQueueCode.CN_UNKNOWN_ERROR));
        } catch (JoyQueueException e2) {
            logger.error("produceMessage exception, transport: {}, topic: {}, app: {}", new Object[]{connection.getTransport().remoteAddress(), str, str2, e2});
            eventListener.onEvent(buildResponse(produceMessageData, JoyQueueCode.valueOf(e2.getCode())));
        }
    }

    protected void checkAndFillMessage(Connection connection, ProduceMessageData produceMessageData) throws JoyQueueException {
        if (CollectionUtils.isEmpty(produceMessageData.getMessages())) {
            throw new JoyQueueException(JoyQueueCode.CN_PARAM_ERROR, new Object[]{"messages not empty"});
        }
        byte[] address = connection.getAddress();
        String txId = produceMessageData.getTxId();
        short partition = ((BrokerMessage) produceMessageData.getMessages().get(0)).getPartition();
        for (BrokerMessage brokerMessage : produceMessageData.getMessages()) {
            if (brokerMessage.getPartition() != partition) {
                throw new JoyQueueException(JoyQueueCode.CN_PARAM_ERROR, new Object[]{"the put message command has multi partition"});
            }
            if (ArrayUtils.getLength(brokerMessage.getByteBody()) > this.produceConfig.getBodyLength()) {
                throw new JoyQueueException(JoyQueueCode.CN_PARAM_ERROR, new Object[]{"message body out of rage"});
            }
            if (StringUtils.length(brokerMessage.getBusinessId()) > this.produceConfig.getBusinessIdLength()) {
                throw new JoyQueueException(JoyQueueCode.CN_PARAM_ERROR, new Object[]{"message businessId out of rage"});
            }
            brokerMessage.setClientIp(address);
            brokerMessage.setTxId(txId);
        }
    }

    protected List<ProduceMessageAckItemData> buildResponse(List<BrokerMessage> list, WriteResult writeResult) {
        BrokerMessage brokerMessage = list.get(0);
        LinkedList newLinkedList = Lists.newLinkedList();
        if (brokerMessage.isBatch()) {
            if (ArrayUtils.isEmpty(writeResult.getIndices())) {
                newLinkedList.add(new ProduceMessageAckItemData(brokerMessage.getPartition(), -1L, brokerMessage.getStartTime()));
            } else {
                newLinkedList.add(new ProduceMessageAckItemData(brokerMessage.getPartition(), writeResult.getIndices()[0], brokerMessage.getStartTime()));
            }
        } else if (ArrayUtils.isEmpty(writeResult.getIndices())) {
            for (BrokerMessage brokerMessage2 : list) {
                newLinkedList.add(new ProduceMessageAckItemData(brokerMessage2.getPartition(), -1L, brokerMessage2.getStartTime()));
            }
        } else {
            for (int i = 0; i < writeResult.getIndices().length; i++) {
                BrokerMessage brokerMessage3 = list.get(i);
                newLinkedList.add(new ProduceMessageAckItemData(brokerMessage3.getPartition(), writeResult.getIndices()[i], brokerMessage3.getStartTime()));
            }
        }
        return newLinkedList;
    }

    protected ProduceMessageAckData buildResponse(ProduceMessageData produceMessageData, JoyQueueCode joyQueueCode) {
        BrokerMessage brokerMessage = (BrokerMessage) produceMessageData.getMessages().get(0);
        LinkedList newLinkedList = Lists.newLinkedList();
        if (brokerMessage.isBatch()) {
            newLinkedList.add(ProduceMessageAckItemData.INVALID_INSTANCE);
        } else {
            for (int i = 0; i < produceMessageData.getMessages().size(); i++) {
                newLinkedList.add(ProduceMessageAckItemData.INVALID_INSTANCE);
            }
        }
        return new ProduceMessageAckData(newLinkedList, joyQueueCode);
    }

    public int type() {
        return JoyQueueCommandType.PRODUCE_MESSAGE_REQUEST.getCode();
    }
}
