/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.flink.utils;

import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Sets;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Streams;
import cz.o2.proxima.internal.shaded.com.google.gson.JsonArray;
import cz.o2.proxima.internal.shaded.com.google.gson.JsonElement;
import cz.o2.proxima.internal.shaded.com.google.gson.JsonObject;
import cz.o2.proxima.internal.shaded.com.google.gson.JsonParser;
import cz.o2.proxima.storage.watermark.GlobalWatermarkTracker;
import cz.o2.proxima.util.ExceptionUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkGlobalWatermarkTracker
implements GlobalWatermarkTracker {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(FlinkGlobalWatermarkTracker.class);
    private URL flinkMasterRest;
    private String jobName;
    private List<String> vertexNames;
    @Nullable
    private String jobId;
    @Nullable
    private List<String> vertices;
    private String name;
    private long updateInterval;
    private long lastUpdate = Long.MIN_VALUE;
    private long globalWatermark;

    public String getName() {
        return this.name;
    }

    public void setup(Map<String, Object> cfg) {
        this.flinkMasterRest = Optional.ofNullable(cfg.get("rest-address")).map(String::valueOf).map(s2 -> (URL)ExceptionUtils.uncheckedFactory((ExceptionUtils.ThrowingFactory & Serializable)() -> new URL((String)s2))).orElseThrow(() -> new IllegalArgumentException("Missing rest-address config option"));
        this.jobName = Optional.ofNullable(cfg.get("job-name")).map(String::valueOf).orElseThrow(() -> new IllegalArgumentException("Missing job-name config option"));
        this.vertexNames = Optional.ofNullable(cfg.get("vertex-names")).map(FlinkGlobalWatermarkTracker::parseVertexNamesToList).orElse(Collections.emptyList());
        this.name = String.format("%s(%s)e", this.getClass().getSimpleName(), this.jobName);
        this.updateInterval = Optional.ofNullable(cfg.get("update-interval-ms")).map(String::valueOf).map(Long::valueOf).orElse(30000L);
        this.globalWatermark = Long.MAX_VALUE;
    }

    @VisibleForTesting
    static List<String> parseVertexNamesToList(Object o) {
        if (o instanceof List) {
            return Stream.of(o).map(List.class::cast).flatMap(l -> l.stream().map(Object::toString)).collect(Collectors.toList());
        }
        return Collections.singletonList(o.toString());
    }

    private List<String> getVertices() {
        return this.vertexNames.isEmpty() ? this.readFullVertices() : this.readVertexIdsFromNames(this.vertexNames);
    }

    private String getJobId(String jobName) {
        try {
            JsonArray jobs = this.getJsonForURL(this.getApiUrl("jobs")).getAsJsonObject().getAsJsonArray("jobs");
            return Streams.stream(jobs).map(JsonElement::getAsJsonObject).map(obj -> obj.get("id").getAsString()).map(id -> (JsonElement)ExceptionUtils.uncheckedFactory((ExceptionUtils.ThrowingFactory & Serializable)() -> this.getJsonForURL(this.getApiUrl("jobs/" + id)))).map(JsonElement::getAsJsonObject).filter(obj -> obj.has("name")).filter(obj -> obj.get("name").getAsString().equals(jobName)).filter(obj -> obj.get("state").getAsString().equals("RUNNING")).map(obj -> obj.get("jid").getAsString()).findAny().orElseThrow(() -> new IllegalStateException(String.format("Job %s not found", jobName)));
        }
        catch (IOException e) {
            throw new IllegalStateException("Cannot get job id for " + jobName, e);
        }
    }

    private List<String> readVertexIdsFromNames(List<String> names) {
        HashSet<String> nameSet = Sets.newHashSet(names);
        return this.readVertexIds(obj -> nameSet.contains(obj.get("name").getAsString()));
    }

    private List<String> readFullVertices() {
        return this.readVertexIds(obj -> true);
    }

    private List<String> readVertexIds(Predicate<JsonObject> acceptable) {
        try {
            URL rest = this.getApiUrl("jobs/" + this.jobId());
            JsonObject parsedJson = this.getJsonForURL(rest).getAsJsonObject();
            JsonArray jobVertices = parsedJson.getAsJsonArray("vertices");
            return Streams.stream(jobVertices).map(JsonElement::getAsJsonObject).filter(acceptable).map(el -> el.get("id").getAsString()).collect(Collectors.toList());
        }
        catch (IOException ex) {
            throw new IllegalStateException("Cannot read vertex ids", ex);
        }
    }

    private String jobId() {
        if (this.jobId == null) {
            this.jobId = this.getJobId(this.jobName);
        }
        return this.jobId;
    }

    private List<String> vertices() {
        if (this.vertices == null) {
            this.vertices = this.getVertices();
        }
        return this.vertices;
    }

    public void initWatermarks(Map<String, Long> initialWatermarks) {
    }

    public CompletableFuture<Void> update(String processName, long currentWatermark) {
        return CompletableFuture.completedFuture(null);
    }

    public void finished(String name) {
    }

    public long getGlobalWatermark(@Nullable String processName, long currentWatermark) {
        if (System.currentTimeMillis() > this.updateInterval + this.lastUpdate) {
            this.updateGlobalWatermark();
        }
        return this.globalWatermark;
    }

    private void updateGlobalWatermark() {
        this.globalWatermark = this.getMinWatermarkFrom(this.vertices());
        this.lastUpdate = System.currentTimeMillis();
    }

    @VisibleForTesting
    long getMinWatermarkFrom(List<String> vertices) {
        return vertices.stream().map(v -> this.getApiUrl("jobs/" + this.jobId() + "/vertices/" + v + "/watermarks")).map(url -> (JsonElement)ExceptionUtils.uncheckedFactory((ExceptionUtils.ThrowingFactory & Serializable)() -> this.getJsonForURL((URL)url))).map(JsonElement::getAsJsonArray).flatMapToLong(arr -> Streams.stream(arr).mapToLong(o -> o.getAsJsonObject().get("value").getAsLong())).min().orElse(Long.MAX_VALUE);
    }

    @VisibleForTesting
    JsonElement getJsonForURL(URL url) throws IOException {
        try (InputStream in = url.openConnection().getInputStream();){
            JsonElement jsonElement = FlinkGlobalWatermarkTracker.parseInputStreamToJson(in);
            return jsonElement;
        }
    }

    @VisibleForTesting
    static JsonElement parseInputStreamToJson(InputStream in) {
        BufferedReader reader = new BufferedReader(new InputStreamReader(in));
        return JsonParser.parseString(reader.lines().collect(Collectors.joining("\n")));
    }

    private URL getApiUrl(String apiPath) {
        return FlinkGlobalWatermarkTracker.getApiUrlFor(this.flinkMasterRest, apiPath);
    }

    @VisibleForTesting
    static URL getApiUrlFor(URL base, String apiPath) {
        try {
            return new URL(base, apiPath);
        }
        catch (MalformedURLException e) {
            throw new IllegalArgumentException(String.format("Cannot form valid URL from %s and %s", base.toString(), apiPath), e);
        }
    }
}

