package cz.o2.proxima.flink.utils;

import cz.o2.proxima.core.storage.watermark.GlobalWatermarkTracker;
import cz.o2.proxima.core.util.ExceptionUtils;
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.com.google.common.collect.Sets;
import cz.o2.proxima.internal.com.google.common.collect.Streams;
import cz.o2.proxima.internal.com.google.gson.JsonElement;
import cz.o2.proxima.internal.com.google.gson.JsonObject;
import cz.o2.proxima.internal.com.google.gson.JsonParser;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.invoke.SerializedLambda;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/flink/utils/FlinkGlobalWatermarkTracker.class */
public class FlinkGlobalWatermarkTracker implements GlobalWatermarkTracker {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(FlinkGlobalWatermarkTracker.class);
    private URL flinkMasterRest;
    private String jobName;
    private List<String> vertexNames;

    @Nullable
    private String jobId;

    @Nullable
    private List<String> vertices;
    private String name;
    private long updateInterval;
    private long lastUpdate = Long.MIN_VALUE;
    private long globalWatermark;

    public String getName() {
        return this.name;
    }

    public void setup(Map<String, Object> map) {
        this.flinkMasterRest = (URL) Optional.ofNullable(map.get("rest-address")).map(String::valueOf).map(str -> {
            return (URL) ExceptionUtils.uncheckedFactory(() -> {
                return new URL(str);
            });
        }).orElseThrow(() -> {
            return new IllegalArgumentException("Missing rest-address config option");
        });
        this.jobName = (String) Optional.ofNullable(map.get("job-name")).map(String::valueOf).orElseThrow(() -> {
            return new IllegalArgumentException("Missing job-name config option");
        });
        this.vertexNames = (List) Optional.ofNullable(map.get("vertex-names")).map(FlinkGlobalWatermarkTracker::parseVertexNamesToList).orElse(Collections.emptyList());
        this.name = String.format("%s(%s)e", getClass().getSimpleName(), this.jobName);
        this.updateInterval = ((Long) Optional.ofNullable(map.get("update-interval-ms")).map(String::valueOf).map(Long::valueOf).orElse(30000L)).longValue();
        this.globalWatermark = Long.MAX_VALUE;
    }

    @VisibleForTesting
    static List<String> parseVertexNamesToList(Object obj) {
        if (!(obj instanceof List)) {
            return Collections.singletonList(obj.toString());
        }
        Stream of = Stream.of(obj);
        Class<List> cls = List.class;
        Objects.requireNonNull(List.class);
        return (List) of.map(cls::cast).flatMap(list -> {
            return list.stream().map((v0) -> {
                return v0.toString();
            });
        }).collect(Collectors.toList());
    }

    private List<String> getVertices() {
        return this.vertexNames.isEmpty() ? readFullVertices() : readVertexIdsFromNames(this.vertexNames);
    }

    private String getJobId(String str) {
        try {
            return (String) Streams.stream(getJsonForURL(getApiUrl("jobs")).getAsJsonObject().getAsJsonArray("jobs")).map((v0) -> {
                return v0.getAsJsonObject();
            }).map(jsonObject -> {
                return jsonObject.get("id").getAsString();
            }).map(str2 -> {
                return (JsonElement) ExceptionUtils.uncheckedFactory(() -> {
                    return getJsonForURL(getApiUrl("jobs/" + str2));
                });
            }).map((v0) -> {
                return v0.getAsJsonObject();
            }).filter(jsonObject2 -> {
                return jsonObject2.has("name");
            }).filter(jsonObject3 -> {
                return jsonObject3.get("name").getAsString().equals(str);
            }).filter(jsonObject4 -> {
                return jsonObject4.get("state").getAsString().equals("RUNNING");
            }).map(jsonObject5 -> {
                return jsonObject5.get("jid").getAsString();
            }).findAny().orElseThrow(() -> {
                return new IllegalStateException(String.format("Job %s not found", str));
            });
        } catch (IOException e) {
            throw new IllegalStateException("Cannot get job id for " + str, e);
        }
    }

    private List<String> readVertexIdsFromNames(List<String> list) {
        HashSet newHashSet = Sets.newHashSet(list);
        return readVertexIds(jsonObject -> {
            return newHashSet.contains(jsonObject.get("name").getAsString());
        });
    }

    private List<String> readFullVertices() {
        return readVertexIds(jsonObject -> {
            return true;
        });
    }

    private List<String> readVertexIds(Predicate<JsonObject> predicate) {
        try {
            return (List) Streams.stream(getJsonForURL(getApiUrl("jobs/" + jobId())).getAsJsonObject().getAsJsonArray("vertices")).map((v0) -> {
                return v0.getAsJsonObject();
            }).filter(predicate).map(jsonObject -> {
                return jsonObject.get("id").getAsString();
            }).collect(Collectors.toList());
        } catch (IOException e) {
            throw new IllegalStateException("Cannot read vertex ids", e);
        }
    }

    private String jobId() {
        if (this.jobId == null) {
            this.jobId = getJobId(this.jobName);
        }
        return this.jobId;
    }

    private List<String> vertices() {
        if (this.vertices == null) {
            this.vertices = getVertices();
        }
        return this.vertices;
    }

    public void initWatermarks(Map<String, Long> map) {
    }

    public CompletableFuture<Void> update(String str, long j) {
        return CompletableFuture.completedFuture(null);
    }

    public void finished(String str) {
    }

    public long getGlobalWatermark(@Nullable String str, long j) {
        if (System.currentTimeMillis() > this.updateInterval + this.lastUpdate) {
            updateGlobalWatermark();
        }
        return this.globalWatermark;
    }

    private void updateGlobalWatermark() {
        this.globalWatermark = getMinWatermarkFrom(vertices());
        this.lastUpdate = System.currentTimeMillis();
    }

    @VisibleForTesting
    long getMinWatermarkFrom(List<String> list) {
        return list.stream().map(str -> {
            return getApiUrl("jobs/" + jobId() + "/vertices/" + str + "/watermarks");
        }).map(url -> {
            return (JsonElement) ExceptionUtils.uncheckedFactory(() -> {
                return getJsonForURL(url);
            });
        }).map((v0) -> {
            return v0.getAsJsonArray();
        }).flatMapToLong(jsonArray -> {
            return Streams.stream(jsonArray).mapToLong(jsonElement -> {
                return jsonElement.getAsJsonObject().get("value").getAsLong();
            });
        }).min().orElse(Long.MAX_VALUE);
    }

    @VisibleForTesting
    JsonElement getJsonForURL(URL url) throws IOException {
        InputStream inputStream = url.openConnection().getInputStream();
        try {
            JsonElement parseInputStreamToJson = parseInputStreamToJson(inputStream);
            if (inputStream != null) {
                inputStream.close();
            }
            return parseInputStreamToJson;
        } catch (Throwable th) {
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @VisibleForTesting
    static JsonElement parseInputStreamToJson(InputStream inputStream) {
        return JsonParser.parseString((String) new BufferedReader(new InputStreamReader(inputStream)).lines().collect(Collectors.joining("\n")));
    }

    private URL getApiUrl(String str) {
        return getApiUrlFor(this.flinkMasterRest, str);
    }

    @VisibleForTesting
    static URL getApiUrlFor(URL url, String str) {
        try {
            return new URL(url, str);
        } catch (MalformedURLException e) {
            throw new IllegalArgumentException(String.format("Cannot form valid URL from %s and %s", url.toString(), str), e);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1039313488:
                if (implMethodName.equals("lambda$getJobId$7db24269$1")) {
                    z = true;
                    break;
                }
                break;
            case -341498773:
                if (implMethodName.equals("lambda$setup$1112a6ae$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1008581156:
                if (implMethodName.equals("lambda$getMinWatermarkFrom$41a05c8f$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/util/ExceptionUtils$ThrowingFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/flink/utils/FlinkGlobalWatermarkTracker") && serializedLambda.getImplMethodSignature().equals("(Ljava/net/URL;)Lcz/o2/proxima/internal/com/google/gson/JsonElement;")) {
                    FlinkGlobalWatermarkTracker flinkGlobalWatermarkTracker = (FlinkGlobalWatermarkTracker) serializedLambda.getCapturedArg(0);
                    URL url = (URL) serializedLambda.getCapturedArg(1);
                    return () -> {
                        return getJsonForURL(url);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/util/ExceptionUtils$ThrowingFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/flink/utils/FlinkGlobalWatermarkTracker") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Lcz/o2/proxima/internal/com/google/gson/JsonElement;")) {
                    FlinkGlobalWatermarkTracker flinkGlobalWatermarkTracker2 = (FlinkGlobalWatermarkTracker) serializedLambda.getCapturedArg(0);
                    String str = (String) serializedLambda.getCapturedArg(1);
                    return () -> {
                        return getJsonForURL(getApiUrl("jobs/" + str));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/util/ExceptionUtils$ThrowingFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/flink/utils/FlinkGlobalWatermarkTracker") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/net/URL;")) {
                    String str2 = (String) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return new URL(str2);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
