package org.joyqueue.broker.protocol.handler;

import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.HashMap;
import org.joyqueue.broker.consumer.model.PullResult;
import org.joyqueue.broker.polling.LongPollingCallback;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.network.command.FetchTopicMessageAckData;
import org.joyqueue.network.command.FetchTopicMessageRequest;
import org.joyqueue.network.command.FetchTopicMessageResponse;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.exception.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/protocol/handler/FetchTopicMessageLongPollCallback.class */
public class FetchTopicMessageLongPollCallback implements LongPollingCallback {
    protected static final Logger logger = LoggerFactory.getLogger(FetchTopicMessageLongPollCallback.class);
    private FetchTopicMessageRequest fetchTopicMessageRequest;
    private Command request;
    private Transport transport;

    public FetchTopicMessageLongPollCallback(FetchTopicMessageRequest fetchTopicMessageRequest, Command command, Transport transport) {
        this.fetchTopicMessageRequest = fetchTopicMessageRequest;
        this.request = command;
        this.transport = transport;
    }

    public void onSuccess(Consumer consumer, PullResult pullResult) throws TransportException {
        FetchTopicMessageAckData fetchTopicMessageAckData = new FetchTopicMessageAckData();
        fetchTopicMessageAckData.setBuffers(pullResult.getBuffers());
        fetchTopicMessageAckData.setCode(pullResult.getCode());
        this.transport.acknowledge(this.request, new Command(buildFetchTopicMessageAck(consumer, fetchTopicMessageAckData)));
    }

    public void onExpire(Consumer consumer) throws TransportException {
        FetchTopicMessageAckData fetchTopicMessageAckData = new FetchTopicMessageAckData();
        fetchTopicMessageAckData.setBuffers(Collections.emptyList());
        fetchTopicMessageAckData.setCode(JoyQueueCode.SUCCESS);
        this.transport.acknowledge(this.request, new Command(buildFetchTopicMessageAck(consumer, fetchTopicMessageAckData)));
    }

    public void onException(Consumer consumer, Throwable th) throws TransportException {
        logger.error("fetchTopicMessage longPolling exception, transport: {}, consumer: {}", new Object[]{this.transport, consumer, th});
        FetchTopicMessageAckData fetchTopicMessageAckData = new FetchTopicMessageAckData();
        fetchTopicMessageAckData.setBuffers(Collections.emptyList());
        if (th instanceof JoyQueueException) {
            fetchTopicMessageAckData.setCode(JoyQueueCode.valueOf(((JoyQueueException) th).getCode()));
        } else {
            fetchTopicMessageAckData.setCode(JoyQueueCode.CN_UNKNOWN_ERROR);
        }
        this.transport.acknowledge(this.request, new Command(buildFetchTopicMessageAck(consumer, fetchTopicMessageAckData)));
    }

    protected FetchTopicMessageResponse buildFetchTopicMessageAck(Consumer consumer, FetchTopicMessageAckData fetchTopicMessageAckData) {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(consumer.getTopic(), fetchTopicMessageAckData);
        FetchTopicMessageResponse fetchTopicMessageResponse = new FetchTopicMessageResponse();
        fetchTopicMessageResponse.setData(newHashMap);
        return fetchTopicMessageResponse;
    }
}
