package io.mantisrx.api.push;

import com.netflix.config.DynamicIntProperty;
import com.netflix.spectator.api.Counter;
import com.netflix.zuul.netty.SpectatorUtils;
import io.mantisrx.api.Constants;
import io.mantisrx.api.Util;
import io.mantisrx.api.push.PushConnectionDetails;
import io.mantisrx.server.core.master.MasterDescription;
import io.mantisrx.server.master.client.HighAvailabilityServices;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import mantis.io.reactivex.netty.RxNetty;
import mantis.io.reactivex.netty.channel.StringTransformer;
import mantis.io.reactivex.netty.pipeline.PipelineConfigurator;
import mantis.io.reactivex.netty.pipeline.PipelineConfigurators;
import mantis.io.reactivex.netty.protocol.http.client.HttpClient;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientRequest;
import mantis.io.reactivex.netty.protocol.http.client.HttpResponseHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;

/* loaded from: input_file:io/mantisrx/api/push/MantisSSEHandler.class */
public class MantisSSEHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private static final Logger log = LoggerFactory.getLogger(MantisSSEHandler.class);
    private final ConnectionBroker connectionBroker;
    private final HighAvailabilityServices highAvailabilityServices;
    private final List<String> pushPrefixes;
    private Subscription subscription;
    private final DynamicIntProperty queueCapacity;
    private final DynamicIntProperty writeIntervalMillis;

    /* loaded from: input_file:io/mantisrx/api/push/MantisSSEHandler$MasterResponse.class */
    public static class MasterResponse {
        private final HttpResponseStatus status;
        private final Observable<ByteBuf> byteBuf;
        private final HttpResponseHeaders responseHeaders;

        public MasterResponse(HttpResponseStatus httpResponseStatus, Observable<ByteBuf> observable, HttpResponseHeaders httpResponseHeaders) {
            this.status = httpResponseStatus;
            this.byteBuf = observable;
            this.responseHeaders = httpResponseHeaders;
        }

        public HttpResponseStatus getStatus() {
            return this.status;
        }

        public Observable<ByteBuf> getByteBuf() {
            return this.byteBuf;
        }

        public HttpResponseHeaders getResponseHeaders() {
            return this.responseHeaders;
        }
    }

    public MantisSSEHandler(ConnectionBroker connectionBroker, HighAvailabilityServices highAvailabilityServices, List<String> list) {
        super(true);
        this.queueCapacity = new DynamicIntProperty("io.mantisrx.api.push.queueCapacity", 1000);
        this.writeIntervalMillis = new DynamicIntProperty("io.mantisrx.api.push.writeIntervalMillis", 50);
        this.connectionBroker = connectionBroker;
        this.highAvailabilityServices = highAvailabilityServices;
        this.pushPrefixes = list;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
        if (!Util.startsWithAnyOf(fullHttpRequest.uri(), this.pushPrefixes) || isWebsocketUpgrade(fullHttpRequest)) {
            channelHandlerContext.fireChannelRead(fullHttpRequest.retain());
            return;
        }
        if (HttpUtil.is100ContinueExpected(fullHttpRequest)) {
            send100Contine(channelHandlerContext);
        }
        DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        HttpHeaders headers = defaultHttpResponse.headers();
        headers.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
        headers.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "Origin, X-Requested-With, Accept, Content-Type, Cache-Control");
        headers.set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream");
        headers.set(HttpHeaderNames.CACHE_CONTROL, "no-cache, no-store, max-age=0, must-revalidate");
        headers.set(HttpHeaderNames.PRAGMA, HttpHeaderValues.NO_CACHE);
        headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
        defaultHttpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        channelHandlerContext.writeAndFlush(defaultHttpResponse);
        String uri = fullHttpRequest.uri();
        PushConnectionDetails pushConnectionDetails = isSubmitAndConnect(fullHttpRequest) ? new PushConnectionDetails(uri, jobSubmit(fullHttpRequest), PushConnectionDetails.TARGET_TYPE.CONNECT_BY_ID, io.vavr.collection.List.empty()) : PushConnectionDetails.from(uri);
        log.info("SSE Connecting for: {}", pushConnectionDetails);
        boolean isTunnelPingsEnabled = isTunnelPingsEnabled(uri);
        String[] taglist = Util.getTaglist(uri, pushConnectionDetails.target);
        Counter newCounter = SpectatorUtils.newCounter(Constants.numDroppedBytesCounterName, pushConnectionDetails.target, taglist);
        Counter newCounter2 = SpectatorUtils.newCounter(Constants.numDroppedMessagesCounterName, pushConnectionDetails.target, taglist);
        Counter newCounter3 = SpectatorUtils.newCounter(Constants.numMessagesCounterName, pushConnectionDetails.target, taglist);
        Counter newCounter4 = SpectatorUtils.newCounter(Constants.numBytesCounterName, pushConnectionDetails.target, taglist);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(this.queueCapacity.get());
        Observable doOnNext = this.connectionBroker.connect(pushConnectionDetails).mergeWith(isTunnelPingsEnabled ? Observable.interval(12L, 12L, TimeUnit.SECONDS).map(l -> {
            return Constants.TunnelPingMessage;
        }) : Observable.empty()).mergeWith(Observable.interval(this.writeIntervalMillis.get(), TimeUnit.MILLISECONDS).map(l2 -> {
            return Constants.DUMMY_TIMER_DATA;
        })).doOnNext(str -> {
            if (Constants.DUMMY_TIMER_DATA.equals(str)) {
                return;
            }
            if (linkedBlockingQueue.offer(Constants.SSE_DATA_PREFIX + str + Constants.SSE_DATA_SUFFIX)) {
                return;
            }
            newCounter.increment(r0.length());
            newCounter2.increment();
        });
        String str2 = Constants.DUMMY_TIMER_DATA;
        this.subscription = doOnNext.filter((v1) -> {
            return r2.equals(v1);
        }).doOnNext(str3 -> {
            if (channelHandlerContext.channel().isWritable()) {
                ArrayList arrayList = new ArrayList(linkedBlockingQueue.size());
                linkedBlockingQueue.drainTo(arrayList);
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer((String) it.next(), StandardCharsets.UTF_8));
                    newCounter3.increment();
                    newCounter4.increment(r0.length());
                }
            }
        }).subscribe();
    }

    private static void send100Contine(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
    }

    private boolean isTunnelPingsEnabled(String str) {
        return ((String) ((List) new QueryStringDecoder(str).parameters().getOrDefault(Constants.TunnelPingParamName, Arrays.asList("false"))).get(0)).equalsIgnoreCase("true");
    }

    private boolean isWebsocketUpgrade(HttpRequest httpRequest) {
        HttpHeaders headers = httpRequest.headers();
        String str = headers.get(HttpHeaderNames.CONNECTION);
        String str2 = headers.get(HttpHeaderNames.UPGRADE);
        return str != null && str.toLowerCase().contains("upgrade") && str2 != null && str2.toLowerCase().equals("websocket");
    }

    private boolean isSubmitAndConnect(HttpRequest httpRequest) {
        return httpRequest.method().equals(HttpMethod.POST) && httpRequest.uri().contains("jobsubmitandconnect");
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        if (this.subscription != null && !this.subscription.isUnsubscribed()) {
            this.subscription.unsubscribe();
        }
        th.printStackTrace();
        channelHandlerContext.close();
    }

    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.subscription != null && !this.subscription.isUnsubscribed()) {
            this.subscription.unsubscribe();
        }
        super.channelUnregistered(channelHandlerContext);
    }

    public String jobSubmit(FullHttpRequest fullHttpRequest) {
        return (String) callPostOnMaster(this.highAvailabilityServices.getMasterMonitor().getMasterObservable(), "/api/submit", fullHttpRequest.content().toString(StandardCharsets.UTF_8)).retryWhen(Util.getRetryFunc(log, "/api/submit")).flatMap(masterResponse -> {
            return masterResponse.getByteBuf().take(1).map(byteBuf -> {
                String byteBuf = byteBuf.toString(StandardCharsets.UTF_8);
                log.info("response: " + byteBuf);
                return byteBuf;
            });
        }).take(1).toBlocking().first();
    }

    public static Observable<MasterResponse> callPostOnMaster(Observable<MasterDescription> observable, String str, String str2) {
        PipelineConfigurator httpClientConfigurator = PipelineConfigurators.httpClientConfigurator();
        return observable.filter((v0) -> {
            return Objects.nonNull(v0);
        }).flatMap(masterDescription -> {
            HttpClient build = RxNetty.newHttpClientBuilder(masterDescription.getHostname(), masterDescription.getApiPort()).pipelineConfigurator(httpClientConfigurator).build();
            HttpClientRequest withHeader = HttpClientRequest.create(HttpMethod.POST, str).withHeader(HttpHeaderNames.CONTENT_TYPE.toString(), HttpHeaderValues.APPLICATION_JSON.toString());
            withHeader.withRawContent(str2, StringTransformer.DEFAULT_INSTANCE);
            return build.submit(withHeader).map(httpClientResponse -> {
                return new MasterResponse(httpClientResponse.getStatus(), httpClientResponse.getContent(), httpClientResponse.getHeaders());
            });
        }).take(1);
    }
}
