package io.mantisrx.server.worker.mesos;

import io.mantisrx.runtime.loader.config.MetricsCollector;
import io.mantisrx.runtime.loader.config.Usage;
import io.reactivx.mantis.operators.OperatorOnErrorResumeNextViaFunction;
import java.nio.charset.Charset;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import mantis.io.reactivex.netty.RxNetty;
import mantis.io.reactivex.netty.protocol.http.client.HttpClient;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientRequest;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;

/* loaded from: input_file:io/mantisrx/server/worker/mesos/MesosMetricsCollector.class */
public class MesosMetricsCollector implements MetricsCollector {
    private static final Logger logger = LoggerFactory.getLogger(MesosMetricsCollector.class);
    private static final long GET_TIMEOUT_SECS = 5;
    private static final int MAX_REDIRECTS = 10;
    private final int slavePort;
    private final String taskId;
    private final Func1<Observable<? extends Throwable>, Observable<?>> retryLogic = observable -> {
        return observable.zipWith(Observable.range(1, 3), (th, num) -> {
            return num;
        }).flatMap(num2 -> {
            logger.info(": retrying conx after sleeping for 2 secs");
            return Observable.timer(2L, TimeUnit.SECONDS);
        });
    };

    public static MesosMetricsCollector valueOf(Properties properties) {
        return new MesosMetricsCollector(Integer.parseInt(properties.getProperty("mantis.agent.mesos.slave.port", "5051")), System.getenv("MESOS_TASK_ID"));
    }

    MesosMetricsCollector(int i, String str) {
        this.slavePort = i;
        this.taskId = str;
    }

    private String getUsageJson() {
        String str = "http://localhost:" + this.slavePort + "/monitor/statistics.json";
        return (String) RxNetty.createHttpRequest(HttpClientRequest.createGet(str), new HttpClient.HttpClientConfig.Builder().setFollowRedirect(true).followRedirect(MAX_REDIRECTS).build()).lift(new OperatorOnErrorResumeNextViaFunction(th -> {
            return Observable.error(th);
        })).timeout(GET_TIMEOUT_SECS, TimeUnit.SECONDS).retryWhen(this.retryLogic).flatMap(httpClientResponse -> {
            return httpClientResponse.getContent();
        }).map(byteBuf -> {
            return byteBuf.toString(Charset.defaultCharset());
        }).doOnError(th2 -> {
            logger.warn("Can't get resource usage from mesos slave endpoint (" + str + ") - " + th2.getMessage(), th2);
        }).toBlocking().firstOrDefault("");
    }

    public Usage get() {
        return getCurentUsage(this.taskId, getUsageJson());
    }

    static Usage getCurentUsage(String str, String str2) {
        String optString;
        if (str2 == null || str2.isEmpty()) {
            return null;
        }
        JSONArray jSONArray = new JSONArray(str2);
        if (jSONArray.length() == 0) {
            return null;
        }
        JSONObject jSONObject = null;
        int i = 0;
        while (true) {
            if (i < jSONArray.length()) {
                JSONObject jSONObject2 = jSONArray.getJSONObject(i);
                if (jSONObject2 != null && (optString = jSONObject2.optString("executor_id")) != null && optString.equals(str)) {
                    jSONObject = jSONObject2.getJSONObject("statistics");
                    break;
                }
                i++;
            } else {
                break;
            }
        }
        if (jSONObject == null) {
            return null;
        }
        double optDouble = jSONObject.optDouble("cpus_limit");
        if (Double.isNaN(optDouble)) {
            optDouble = 0.0d;
        }
        double optDouble2 = jSONObject.optDouble("cpus_system_time_secs");
        if (Double.isNaN(optDouble2)) {
            logger.warn("Didn't get cpus_system_time_secs from mesos stats");
            optDouble2 = 0.0d;
        }
        double optDouble3 = jSONObject.optDouble("cpus_user_time_secs");
        if (Double.isNaN(optDouble3)) {
            logger.warn("Didn't get cpus_user_time_secs from mesos stats");
            optDouble3 = 0.0d;
        }
        double optDouble4 = jSONObject.optDouble("mem_rss_bytes");
        if (Double.isNaN(optDouble4)) {
            logger.warn("Couldn't get mem_rss_bytes from mesos stats");
            optDouble4 = 0.0d;
        }
        double optDouble5 = jSONObject.optDouble("mem_anon_bytes");
        if (Double.isNaN(optDouble5)) {
            optDouble5 = optDouble4;
        }
        double optDouble6 = jSONObject.optDouble("mem_limit_bytes");
        if (Double.isNaN(optDouble6)) {
            optDouble6 = 0.0d;
        }
        double optDouble7 = jSONObject.optDouble("net_rx_bytes");
        if (Double.isNaN(optDouble7)) {
            optDouble7 = 0.0d;
        }
        double optDouble8 = jSONObject.optDouble("net_tx_bytes");
        if (Double.isNaN(optDouble8)) {
            optDouble8 = 0.0d;
        }
        return new Usage(optDouble, optDouble2, optDouble3, optDouble6, optDouble4, optDouble5, optDouble7, optDouble8);
    }
}
