package io.mantisrx.api.tunnel;

import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicStringProperty;
import com.netflix.spectator.api.Counter;
import com.netflix.zuul.netty.SpectatorUtils;
import io.mantisrx.api.Constants;
import io.mantisrx.api.Util;
import io.mantisrx.api.push.ConnectionBroker;
import io.mantisrx.api.push.PushConnectionDetails;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.QueryStringEncoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import mantis.io.reactivex.netty.channel.StringTransformer;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientRequest;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientResponse;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;

/* loaded from: input_file:io/mantisrx/api/tunnel/CrossRegionHandler.class */
public class CrossRegionHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private final List<String> pushPrefixes;
    private final MantisCrossRegionalClient mantisCrossRegionalClient;
    private final ConnectionBroker connectionBroker;
    private final Scheduler scheduler;
    private Subscription subscription;
    private final DynamicIntProperty queueCapacity;
    private final DynamicIntProperty writeIntervalMillis;
    private final DynamicStringProperty tunnelRegionsProperty;
    private static final String regKey = "mantis.meta.origin";
    private static final String errKey = "mantis.meta.errorString";
    private static final String codeKey = "mantis.meta.origin.response.code";
    private static final Logger log = LoggerFactory.getLogger(CrossRegionHandler.class);
    private static final Charset CHARSET = StandardCharsets.UTF_8;

    private List<String> getTunnelRegions() {
        return (List) Arrays.asList(this.tunnelRegionsProperty.get().split(",")).stream().map((v0) -> {
            return v0.trim();
        }).map((v0) -> {
            return v0.toLowerCase();
        }).collect(Collectors.toList());
    }

    public CrossRegionHandler(List<String> list, MantisCrossRegionalClient mantisCrossRegionalClient, ConnectionBroker connectionBroker, Scheduler scheduler) {
        super(true);
        this.subscription = null;
        this.queueCapacity = new DynamicIntProperty("io.mantisrx.api.push.queueCapacity", 1000);
        this.writeIntervalMillis = new DynamicIntProperty("io.mantisrx.api.push.writeIntervalMillis", 50);
        this.tunnelRegionsProperty = new DynamicStringProperty("io.mantisrx.api.tunnel.regions", Util.getLocalRegion());
        this.pushPrefixes = list;
        this.mantisCrossRegionalClient = mantisCrossRegionalClient;
        this.connectionBroker = connectionBroker;
        this.scheduler = scheduler;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
        if (HttpUtil.is100ContinueExpected(fullHttpRequest)) {
            send100Contine(channelHandlerContext);
        }
        if (isCrossRegionStreamingPath(fullHttpRequest.uri())) {
            handleRemoteSse(channelHandlerContext, fullHttpRequest);
            return;
        }
        if (fullHttpRequest.method() == HttpMethod.HEAD) {
            handleHead(channelHandlerContext, fullHttpRequest);
            return;
        }
        if (fullHttpRequest.method() == HttpMethod.GET) {
            handleRestGet(channelHandlerContext, fullHttpRequest);
        } else if (fullHttpRequest.method() == HttpMethod.POST) {
            handleRestPost(channelHandlerContext, fullHttpRequest);
        } else {
            channelHandlerContext.fireChannelRead(fullHttpRequest.retain());
        }
    }

    private void handleHead(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
        DefaultHttpHeaders defaultHttpHeaders = new DefaultHttpHeaders();
        defaultHttpHeaders.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
        defaultHttpHeaders.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
        defaultHttpHeaders.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "Origin, X-Requested-With, Accept, Content-Type, Cache-Control");
        defaultHttpHeaders.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, OPTIONS, PUT, POST, DELETE, CONNECT");
        channelHandlerContext.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer("", CHARSET), defaultHttpHeaders, new DefaultHttpHeaders())).addListener(future -> {
            channelHandlerContext.close();
        });
    }

    private void handleRestGet(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
        List<String> tunnelRegions = Util.isAllRegion(getRegion(fullHttpRequest.uri())) ? getTunnelRegions() : Collections.singletonList(getRegion(fullHttpRequest.uri()));
        String tail = getTail(fullHttpRequest.uri());
        Observable.from(tunnelRegions).flatMap(str -> {
            AtomicReference atomicReference = new AtomicReference();
            HttpClientRequest create = HttpClientRequest.create(HttpMethod.GET, tail);
            return Observable.create(subscriber -> {
                subscriber.onNext(this.mantisCrossRegionalClient.getSecureRestClient(str));
            }).flatMap(httpClient -> {
                atomicReference.set(null);
                return httpClient.submit(create).flatMap(httpClientResponse -> {
                    if (httpClientResponse.getStatus().code() >= 500) {
                        throw new RuntimeException(httpClientResponse.getStatus().toString());
                    }
                    return responseToRegionData(str, httpClientResponse);
                }).onErrorReturn(th -> {
                    log.warn("Error getting response from remote master: " + th.getMessage());
                    atomicReference.set(th);
                    return new RegionData(str, false, th.getMessage(), 0);
                });
            }).map(regionData -> {
                Throwable th = (Throwable) atomicReference.get();
                if (th != null) {
                    throw new RuntimeException(th);
                }
                return regionData;
            }).retryWhen(Util.getRetryFunc(log, tail + " in " + str)).take(1).onErrorReturn(th -> {
                return new RegionData(str, false, th.getMessage(), 0);
            });
        }).reduce(new ArrayList(3), (arrayList, regionData) -> {
            arrayList.add(regionData);
            return arrayList;
        }).observeOn(this.scheduler).subscribeOn(this.scheduler).take(1).subscribe(arrayList2 -> {
            writeDataAndCloseChannel(channelHandlerContext, arrayList2);
        });
    }

    private void handleRestPost(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
        String tail = getTail(fullHttpRequest.uri());
        List<String> tunnelRegions = Util.isAllRegion(getRegion(fullHttpRequest.uri())) ? getTunnelRegions() : Collections.singletonList(getRegion(fullHttpRequest.uri()));
        log.info("Relaying POST URI {} to {}.", tail, tunnelRegions);
        AtomicReference atomicReference = new AtomicReference();
        String byteBuf = fullHttpRequest.content().toString(CHARSET);
        Observable.from(tunnelRegions).flatMap(str -> {
            HttpClientRequest create = HttpClientRequest.create(HttpMethod.POST, tail);
            create.withRawContent(byteBuf, StringTransformer.DEFAULT_INSTANCE);
            return Observable.create(subscriber -> {
                subscriber.onNext(this.mantisCrossRegionalClient.getSecureRestClient(str));
            }).flatMap(httpClient -> {
                return httpClient.submit(create).flatMap(httpClientResponse -> {
                    if (httpClientResponse.getStatus().code() >= 500) {
                        throw new RuntimeException(httpClientResponse.getStatus().toString() + "in " + str);
                    }
                    return responseToRegionData(str, httpClientResponse);
                }).onErrorReturn(th -> {
                    log.warn("Error getting response from remote master: " + th.getMessage());
                    atomicReference.set(th);
                    return new RegionData(str, false, th.getMessage(), 0);
                });
            }).map(regionData -> {
                Throwable th = (Throwable) atomicReference.get();
                if (th != null) {
                    throw new RuntimeException(th);
                }
                return regionData;
            }).retryWhen(Util.getRetryFunc(log, tail + " in " + str)).take(1).onErrorReturn(th -> {
                return new RegionData(str, false, th.getMessage(), 0);
            });
        }).reduce(new ArrayList(), (arrayList, regionData) -> {
            arrayList.add(regionData);
            return arrayList;
        }).observeOn(this.scheduler).subscribeOn(this.scheduler).take(1).subscribe(arrayList2 -> {
            writeDataAndCloseChannel(channelHandlerContext, arrayList2);
        });
    }

    private void handleRemoteSse(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
        DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        HttpHeaders headers = defaultHttpResponse.headers();
        headers.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
        headers.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "Origin, X-Requested-With, Accept, Content-Type, Cache-Control");
        headers.set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream");
        headers.set(HttpHeaderNames.CACHE_CONTROL, "no-cache, no-store, max-age=0, must-revalidate");
        headers.set(HttpHeaderNames.PRAGMA, HttpHeaderValues.NO_CACHE);
        headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
        defaultHttpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        channelHandlerContext.writeAndFlush(defaultHttpResponse);
        boolean hasTunnelPingParam = hasTunnelPingParam(fullHttpRequest.uri());
        String uriWithTunnelParamsAdded = uriWithTunnelParamsAdded(getTail(fullHttpRequest.uri()));
        List<String> tunnelRegions = Util.isAllRegion(getRegion(fullHttpRequest.uri())) ? getTunnelRegions() : Collections.singletonList(getRegion(fullHttpRequest.uri()));
        log.info("Initiating remote SSE connection to {} in {}.", uriWithTunnelParamsAdded, tunnelRegions);
        PushConnectionDetails from = PushConnectionDetails.from(uriWithTunnelParamsAdded, tunnelRegions);
        String[] taglist = Util.getTaglist(fullHttpRequest.uri(), from.target, getRegion(fullHttpRequest.uri()));
        Counter newCounter = SpectatorUtils.newCounter(Constants.numDroppedBytesCounterName, from.target, taglist);
        Counter newCounter2 = SpectatorUtils.newCounter(Constants.numDroppedMessagesCounterName, from.target, taglist);
        Counter newCounter3 = SpectatorUtils.newCounter(Constants.numMessagesCounterName, from.target, taglist);
        Counter newCounter4 = SpectatorUtils.newCounter(Constants.numBytesCounterName, from.target, taglist);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(this.queueCapacity.get());
        Observable doOnNext = this.connectionBroker.connect(from).filter(str -> {
            return Boolean.valueOf(!str.equalsIgnoreCase(Constants.TunnelPingMessage) || hasTunnelPingParam);
        }).mergeWith(Observable.interval(this.writeIntervalMillis.get(), TimeUnit.MILLISECONDS).map(l -> {
            return Constants.DUMMY_TIMER_DATA;
        })).doOnNext(str2 -> {
            if (Constants.DUMMY_TIMER_DATA.equals(str2)) {
                return;
            }
            if (linkedBlockingQueue.offer(Constants.SSE_DATA_PREFIX + str2 + Constants.SSE_DATA_SUFFIX)) {
                return;
            }
            newCounter.increment(r0.length());
            newCounter2.increment();
        });
        String str3 = Constants.DUMMY_TIMER_DATA;
        this.subscription = doOnNext.filter((v1) -> {
            return r2.equals(v1);
        }).doOnNext(str4 -> {
            if (channelHandlerContext.channel().isWritable()) {
                ArrayList arrayList = new ArrayList(linkedBlockingQueue.size());
                linkedBlockingQueue.drainTo(arrayList);
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer((String) it.next(), CHARSET));
                    newCounter3.increment();
                    newCounter4.increment(r0.length());
                }
            }
        }).subscribe();
    }

    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelUnregistered(channelHandlerContext);
        if (this.subscription == null || this.subscription.isUnsubscribed()) {
            return;
        }
        this.subscription.unsubscribe();
    }

    private boolean hasTunnelPingParam(String str) {
        return str != null && str.contains(Constants.TunnelPingParamName);
    }

    private Observable<RegionData> responseToRegionData(String str, HttpClientResponse<ByteBuf> httpClientResponse) {
        int code = httpClientResponse.getStatus().code();
        return httpClientResponse.getContent().collect(Unpooled::buffer, (v0, v1) -> {
            v0.writeBytes(v1);
        }).map(byteBuf -> {
            return new RegionData(str, true, byteBuf.toString(CHARSET), code);
        }).onErrorReturn(th -> {
            return new RegionData(str, false, th.getMessage(), code);
        });
    }

    private void writeDataAndCloseChannel(ChannelHandlerContext channelHandlerContext, ArrayList<RegionData> arrayList) {
        try {
            String responseToString = responseToString(arrayList);
            DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseToString, CHARSET));
            HttpHeaders headers = defaultFullHttpResponse.headers();
            headers.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON + "; charset=utf-8");
            headers.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
            headers.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "Origin, X-Requested-With, Accept, Content-Type, Cache-Control");
            headers.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, OPTIONS, PUT, POST, DELETE, CONNECT");
            headers.add(HttpHeaderNames.CONTENT_LENGTH, Integer.valueOf(responseToString.length()));
            channelHandlerContext.writeAndFlush(defaultFullHttpResponse).addListener(future -> {
                channelHandlerContext.close();
            });
        } catch (Exception e) {
            log.error("Error serializing cross regional response: {}", e.getMessage(), e);
        }
    }

    private String uriWithTunnelParamsAdded(String str) {
        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(str);
        QueryStringEncoder queryStringEncoder = new QueryStringEncoder(queryStringDecoder.path());
        queryStringDecoder.parameters().forEach((str2, list) -> {
            list.forEach(str2 -> {
                queryStringEncoder.addParam(str2, str2);
            });
        });
        queryStringEncoder.addParam(Constants.TunnelPingParamName, "true");
        queryStringEncoder.addParam(Constants.TagsParamName, "originRegion:" + Util.getLocalRegion());
        return queryStringEncoder.toString();
    }

    private static void send100Contine(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
    }

    private boolean isCrossRegionStreamingPath(String str) {
        return Util.startsWithAnyOf(getTail(str), this.pushPrefixes);
    }

    private static String getTail(String str) {
        return str.replaceFirst("^/region/.*?/", "/");
    }

    private static String getRegion(String str) {
        return str.replaceFirst("^/region/", "").replaceFirst("/.*$", "").trim().toLowerCase();
    }

    private static String responseToString(List<RegionData> list) {
        StringBuilder sb = new StringBuilder("[");
        boolean z = true;
        for (RegionData regionData : list) {
            if (z) {
                z = false;
            } else {
                sb.append(",");
            }
            if (regionData.isSuccess()) {
                sb.append(getForceWrappedJson(regionData.getData(), regionData.getRegion(), regionData.getResponseCode(), null));
            } else {
                sb.append(getForceWrappedJson("", regionData.getRegion(), regionData.getResponseCode(), regionData.getData()));
            }
        }
        sb.append("]");
        return sb.toString();
    }

    public static String getWrappedJson(String str, String str2, String str3) {
        return getWrappedJsonIntl(str, str2, str3, 0, false);
    }

    public static String getForceWrappedJson(String str, String str2, int i, String str3) {
        return getWrappedJsonIntl(str, str2, str3, i, true);
    }

    private static String getWrappedJsonIntl(String str, String str2, String str3, int i, boolean z) {
        try {
            JSONObject jSONObject = new JSONObject(str);
            jSONObject.put("mantis.meta.origin", str2);
            if (str3 != null && !str3.isEmpty()) {
                jSONObject.put(errKey, str3);
            }
            if (i > 0) {
                jSONObject.put(codeKey, "" + i);
            }
            return jSONObject.toString();
        } catch (JSONException e) {
            try {
                JSONArray jSONArray = new JSONArray(str);
                if (!z) {
                    return str;
                }
                JSONObject jSONObject2 = new JSONObject();
                jSONObject2.put("mantis.meta.origin", str2);
                if (str3 != null && !str3.isEmpty()) {
                    jSONObject2.put(errKey, str3);
                }
                if (i > 0) {
                    jSONObject2.put(codeKey, "" + i);
                }
                jSONObject2.accumulate("response", jSONArray);
                return jSONObject2.toString();
            } catch (JSONException e2) {
                if (!z) {
                    return str;
                }
                JSONObject jSONObject3 = new JSONObject();
                jSONObject3.put("mantis.meta.origin", str2);
                if (str3 != null && !str3.isEmpty()) {
                    jSONObject3.put(errKey, str3);
                }
                if (i > 0) {
                    jSONObject3.put(codeKey, "" + i);
                }
                jSONObject3.put("response", str);
                return jSONObject3.toString();
            }
        }
    }
}
