package org.apache.pulsar.websocket;

import java.io.IOException;
import java.util.Base64;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.pulsar.shade.javax.servlet.http.HttpServletRequest;
import org.apache.pulsar.shade.org.apache.commons.compress.java.util.jar.Pack200;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.Session;
import org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WriteCallback;
import org.apache.pulsar.shade.org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.apache.pulsar.websocket.data.ConsumerMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/websocket/ReaderHandler.class */
public class ReaderHandler extends AbstractWebSocketHandler {
    private static final int DEFAULT_RECEIVER_QUEUE_SIZE = 1000;
    private String subscription;
    private Reader<byte[]> reader;
    private final int maxPendingMessages;
    private final AtomicInteger pendingMessages;
    private final LongAdder numMsgsDelivered;
    private final LongAdder numBytesDelivered;
    private volatile long msgDeliveredCounter;
    private static final AtomicLongFieldUpdater<ReaderHandler> MSG_DELIVERED_COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ReaderHandler.class, "msgDeliveredCounter");
    private static final Logger log = LoggerFactory.getLogger(ReaderHandler.class);

    public ReaderHandler(WebSocketService webSocketService, HttpServletRequest httpServletRequest, ServletUpgradeResponse servletUpgradeResponse) {
        super(webSocketService, httpServletRequest, servletUpgradeResponse);
        this.subscription = "";
        this.pendingMessages = new AtomicInteger();
        this.msgDeliveredCounter = 0L;
        int receiverQueueSize = getReceiverQueueSize();
        this.maxPendingMessages = receiverQueueSize == 0 ? 1 : receiverQueueSize;
        this.numMsgsDelivered = new LongAdder();
        this.numBytesDelivered = new LongAdder();
        if (checkAuth(servletUpgradeResponse)) {
            try {
                ReaderBuilder<byte[]> receiverQueueSize2 = webSocketService.getPulsarClient().newReader().topic(this.topic.toString()).startMessageId(getMessageId()).receiverQueueSize(receiverQueueSize);
                if (this.queryParams.containsKey("readerName")) {
                    receiverQueueSize2.readerName(this.queryParams.get("readerName"));
                }
                this.reader = receiverQueueSize2.create();
                this.subscription = ((ReaderImpl) this.reader).getConsumer().getSubscription();
                if (!this.service.addReader(this)) {
                    log.warn("[{}:{}] Failed to add reader handler for topic {}", new Object[]{httpServletRequest.getRemoteAddr(), Integer.valueOf(httpServletRequest.getRemotePort()), this.topic});
                }
            } catch (Exception e) {
                log.warn("[{}:{}] Failed in creating reader {} on topic {}", new Object[]{httpServletRequest.getRemoteAddr(), Integer.valueOf(httpServletRequest.getRemotePort()), this.subscription, this.topic, e});
                try {
                    servletUpgradeResponse.sendError(500, "Failed to create reader: " + e.getMessage());
                } catch (IOException e2) {
                    log.warn("[{}:{}] Failed to send error: {}", new Object[]{httpServletRequest.getRemoteAddr(), Integer.valueOf(httpServletRequest.getRemotePort()), e2.getMessage(), e2});
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void receiveMessage() {
        if (log.isDebugEnabled()) {
            log.debug("[{}:{}] [{}] [{}] Receive next message", new Object[]{this.request.getRemoteAddr(), Integer.valueOf(this.request.getRemotePort()), this.topic, this.subscription});
        }
        this.reader.readNextAsync().thenAccept(message -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] [{}] Got message {}", new Object[]{getSession().getRemoteAddress(), this.topic, this.subscription, message.getMessageId()});
            }
            ConsumerMessage consumerMessage = new ConsumerMessage();
            consumerMessage.messageId = Base64.getEncoder().encodeToString(message.getMessageId().toByteArray());
            consumerMessage.payload = Base64.getEncoder().encodeToString(message.getData());
            consumerMessage.properties = message.getProperties();
            consumerMessage.publishTime = DateFormatter.format(message.getPublishTime());
            consumerMessage.redeliveryCount = message.getRedeliveryCount();
            if (message.getEventTime() != 0) {
                consumerMessage.eventTime = DateFormatter.format(message.getEventTime());
            }
            if (message.hasKey()) {
                consumerMessage.key = message.getKey();
            }
            final long length = message.getData().length;
            try {
                getSession().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(consumerMessage), new WriteCallback() { // from class: org.apache.pulsar.websocket.ReaderHandler.1
                    @Override // org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WriteCallback
                    public void writeFailed(Throwable th) {
                        ReaderHandler.log.warn("[{}/{}] Failed to deliver msg to {} {}", new Object[]{ReaderHandler.this.reader.getTopic(), ReaderHandler.this.subscription, ReaderHandler.this.getRemote().getInetSocketAddress().toString(), th.getMessage()});
                        ReaderHandler.this.pendingMessages.decrementAndGet();
                        ReaderHandler.this.service.getExecutor().execute(() -> {
                            ReaderHandler.this.receiveMessage();
                        });
                    }

                    @Override // org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WriteCallback
                    public void writeSuccess() {
                        if (ReaderHandler.log.isDebugEnabled()) {
                            ReaderHandler.log.debug("[{}/{}] message is delivered successfully to {} ", new Object[]{ReaderHandler.this.reader.getTopic(), ReaderHandler.this.subscription, ReaderHandler.this.getRemote().getInetSocketAddress().toString()});
                        }
                        ReaderHandler.this.updateDeliverMsgStat(length);
                    }
                });
            } catch (JsonProcessingException e) {
                close(WebSocketError.FailedToSerializeToJSON);
            }
            if (this.pendingMessages.incrementAndGet() < this.maxPendingMessages) {
                this.service.getExecutor().execute(() -> {
                    receiveMessage();
                });
            }
        }).exceptionally(th -> {
            if (th.getCause() instanceof PulsarClientException.AlreadyClosedException) {
                log.info("[{}/{}] Reader was closed while receiving msg from broker", this.reader.getTopic(), this.subscription);
                return null;
            }
            log.warn("[{}/{}] Error occurred while reader handler was delivering msg to {}: {}", new Object[]{this.reader.getTopic(), this.subscription, getRemote().getInetSocketAddress().toString(), th.getMessage()});
            return null;
        });
    }

    @Override // org.apache.pulsar.websocket.AbstractWebSocketHandler, org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WebSocketAdapter, org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WebSocketConnectionListener
    public void onWebSocketConnect(Session session) {
        super.onWebSocketConnect(session);
        receiveMessage();
    }

    @Override // org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WebSocketAdapter, org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WebSocketListener
    public void onWebSocketText(String str) {
        super.onWebSocketText(str);
        if (this.pendingMessages.getAndDecrement() >= this.maxPendingMessages) {
            receiveMessage();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.reader != null) {
            if (!this.service.removeReader(this)) {
                log.warn("[{}] Failed to remove reader handler", this.reader.getTopic());
            }
            this.reader.closeAsync().thenAccept(r5 -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Closed reader asynchronously", this.reader.getTopic());
                }
            }).exceptionally(th -> {
                log.warn("[{}] Failed to close reader", this.reader.getTopic(), th);
                return null;
            });
        }
    }

    public Consumer<?> getConsumer() {
        if (this.reader != null) {
            return ((ReaderImpl) this.reader).getConsumer();
        }
        return null;
    }

    public String getSubscription() {
        return this.subscription;
    }

    public SubscriptionType getSubscriptionType() {
        return SubscriptionType.Exclusive;
    }

    public long getAndResetNumMsgsDelivered() {
        return this.numMsgsDelivered.sumThenReset();
    }

    public long getAndResetNumBytesDelivered() {
        return this.numBytesDelivered.sumThenReset();
    }

    public long getMsgDeliveredCounter() {
        return this.msgDeliveredCounter;
    }

    protected void updateDeliverMsgStat(long j) {
        this.numMsgsDelivered.increment();
        MSG_DELIVERED_COUNTER_UPDATER.incrementAndGet(this);
        this.numBytesDelivered.add(j);
    }

    @Override // org.apache.pulsar.websocket.AbstractWebSocketHandler
    protected Boolean isAuthorized(String str, AuthenticationDataSource authenticationDataSource) throws Exception {
        return Boolean.valueOf(this.service.getAuthorizationService().canConsume(this.topic, str, authenticationDataSource, this.subscription));
    }

    private int getReceiverQueueSize() {
        int i = 1000;
        if (this.queryParams.containsKey("receiverQueueSize")) {
            i = Math.min(Integer.parseInt(this.queryParams.get("receiverQueueSize")), 1000);
        }
        return i;
    }

    private MessageId getMessageId() throws IOException {
        MessageId messageId = MessageId.latest;
        if (StringUtils.isNotBlank(this.queryParams.get("messageId"))) {
            if (this.queryParams.get("messageId").equals("earliest")) {
                messageId = MessageId.earliest;
            } else if (!this.queryParams.get("messageId").equals(Pack200.Packer.LATEST)) {
                messageId = MessageIdImpl.fromByteArray(Base64.getDecoder().decode(this.queryParams.get("messageId")));
            }
        }
        return messageId;
    }
}
