package io.mantisrx.api.push;

import com.netflix.config.DynamicIntProperty;
import com.netflix.spectator.api.Counter;
import com.netflix.zuul.netty.SpectatorUtils;
import io.mantisrx.api.Constants;
import io.mantisrx.api.Util;
import io.mantisrx.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.util.ReferenceCountUtil;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Subscription;

/* loaded from: input_file:io/mantisrx/api/push/MantisWebSocketFrameHandler.class */
public class MantisWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private static final Logger log = LoggerFactory.getLogger(MantisWebSocketFrameHandler.class);
    private final ConnectionBroker connectionBroker;
    private final DynamicIntProperty queueCapacity;
    private final DynamicIntProperty writeIntervalMillis;
    private Subscription subscription;
    private String uri;
    private ScheduledExecutorService scheduledExecutorService;
    private ScheduledFuture drainFuture;

    public MantisWebSocketFrameHandler(ConnectionBroker connectionBroker) {
        super(true);
        this.queueCapacity = new DynamicIntProperty("io.mantisrx.api.push.queueCapacity", 1000);
        this.writeIntervalMillis = new DynamicIntProperty("io.mantisrx.api.push.writeIntervalMillis", 50);
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("websocket-handler-drainer-%d").build());
        this.connectionBroker = connectionBroker;
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj.getClass() != WebSocketServerProtocolHandler.HandshakeComplete.class) {
            ReferenceCountUtil.retain(obj);
            super.userEventTriggered(channelHandlerContext, obj);
            return;
        }
        this.uri = ((WebSocketServerProtocolHandler.HandshakeComplete) obj).requestUri();
        PushConnectionDetails from = PushConnectionDetails.from(this.uri);
        log.info("Request to URI '{}' is a WebSSocket upgrade, removing the SSE handler", this.uri);
        if (channelHandlerContext.pipeline().get(MantisSSEHandler.class) != null) {
            channelHandlerContext.pipeline().remove(MantisSSEHandler.class);
        }
        String[] taglist = Util.getTaglist(this.uri, from.target);
        Counter newCounter = SpectatorUtils.newCounter(Constants.numDroppedBytesCounterName, from.target, taglist);
        Counter newCounter2 = SpectatorUtils.newCounter(Constants.numDroppedMessagesCounterName, from.target, taglist);
        Counter newCounter3 = SpectatorUtils.newCounter(Constants.numMessagesCounterName, from.target, taglist);
        Counter newCounter4 = SpectatorUtils.newCounter(Constants.numBytesCounterName, from.target, taglist);
        Counter newCounter5 = SpectatorUtils.newCounter(Constants.drainTriggeredCounterName, from.target, taglist);
        Counter newCounter6 = SpectatorUtils.newCounter(Constants.numIncomingMessagesCounterName, from.target, taglist);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(this.queueCapacity.get());
        this.drainFuture = this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                if (linkedBlockingQueue.size() > 0 && channelHandlerContext.channel().isWritable()) {
                    newCounter5.increment();
                    ArrayList arrayList = new ArrayList(linkedBlockingQueue.size());
                    synchronized (linkedBlockingQueue) {
                        linkedBlockingQueue.drainTo(arrayList);
                    }
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        channelHandlerContext.write(new TextWebSocketFrame((String) it.next()));
                        newCounter3.increment();
                        newCounter4.increment(r0.length());
                    }
                    channelHandlerContext.flush();
                }
            } catch (Exception e) {
                log.error("Error writing to channel", e);
            }
        }, this.writeIntervalMillis.get(), this.writeIntervalMillis.get(), TimeUnit.MILLISECONDS);
        this.subscription = this.connectionBroker.connect(from).doOnNext(str -> {
            boolean offer;
            newCounter6.increment();
            if (Constants.DUMMY_TIMER_DATA.equals(str)) {
                return;
            }
            synchronized (linkedBlockingQueue) {
                offer = linkedBlockingQueue.offer(str);
            }
            if (offer) {
                return;
            }
            newCounter.increment(str.length());
            newCounter2.increment();
        }).subscribe();
    }

    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        log.info("Channel {} is unregistered. URI: {}", channelHandlerContext.channel(), this.uri);
        unsubscribeIfSubscribed();
        super.channelUnregistered(channelHandlerContext);
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        log.info("Channel {} is inactive. URI: {}", channelHandlerContext.channel(), this.uri);
        unsubscribeIfSubscribed();
        super.channelInactive(channelHandlerContext);
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        log.warn("Exception caught by channel {}. URI: {}", new Object[]{channelHandlerContext.channel(), this.uri, th});
        unsubscribeIfSubscribed();
        channelHandlerContext.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) {
    }

    private void unsubscribeIfSubscribed() {
        if (this.subscription == null || this.subscription.isUnsubscribed()) {
            return;
        }
        log.info("WebSocket unsubscribing subscription with URI: {}", this.uri);
        this.subscription.unsubscribe();
    }
}
