/*
 * Decompiled with CFR 0.152.
 */
package io.yupiik.kubernetes.operator.base.impl;

import io.yupiik.fusion.framework.api.main.Awaiter;
import io.yupiik.fusion.json.JsonMapper;
import io.yupiik.fusion.kubernetes.client.KubernetesClient;
import io.yupiik.kubernetes.operator.base.impl.ObjectLike;
import io.yupiik.kubernetes.operator.base.impl.OperatorConfiguration;
import io.yupiik.kubernetes.operator.base.impl.SimpleController;
import io.yupiik.kubernetes.operator.base.spi.Operator;
import java.net.URI;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

public class OperatorRuntime<T extends ObjectLike>
extends SimpleController<T>
implements Awaiter {
    private final Logger logger = Logger.getLogger(this.getClass().getName());
    private final OperatorConfiguration configuration;
    private final KubernetesClient kubernetes;
    private final ExecutorService threads;
    private final JsonMapper jsonMapper;
    private final Operator<T> spec;
    private final List<URI> findAllUris;
    private final ReentrantLock lock = new ReentrantLock();
    private final CountDownLatch awaiter = new CountDownLatch(1);
    private String lastResource;

    public OperatorRuntime(OperatorConfiguration configuration, KubernetesClient kubernetes, ScheduledExecutorService threads2, JsonMapper jsonMapper, Operator<T> spec) {
        super(threads2, jsonMapper, configuration.eventThreadCount(), spec);
        this.configuration = configuration;
        this.kubernetes = kubernetes;
        this.threads = threads2;
        this.jsonMapper = jsonMapper;
        this.spec = spec;
        this.findAllUris = spec.namespaces().stream().map(it -> URI.create("https://kubernetes.api/apis/" + spec.apiVersion() + "/" + (String)(spec.namespaced() ? "namespaces/" + it + "/" : "") + spec.pluralName())).toList();
    }

    public CompletionStage<?> doStart() {
        return this.spec.onStart().thenRunAsync(this::startListening, this.threads);
    }

    public void doStop() {
        try {
            super.stop();
            this.spec.onStop();
        }
        finally {
            if (this.awaiter.getCount() > 0L) {
                this.awaiter.countDown();
            }
        }
    }

    private void startListening() {
        if (this.stopping) {
            return;
        }
        this.findAllUris.forEach(uri -> this.findAll((URI)uri).thenRunAsync(this::init, this.threads).thenRunAsync(() -> this.watch((URI)uri), this.threads).exceptionally(e -> {
            RuntimeException runtimeException;
            this.logger.log(Level.SEVERE, (Throwable)e, () -> "Can't watch events: " + e.getMessage() + ", exiting");
            this.awaiter.countDown();
            if (e instanceof RuntimeException) {
                RuntimeException re = (RuntimeException)e;
                runtimeException = re;
            } else {
                runtimeException = new IllegalStateException((Throwable)e);
            }
            throw runtimeException;
        }));
    }

    @Override
    protected void onBookmark(String resourceVersion) {
        String current = this.lastResource;
        if (current != null && this.isHigher(current, resourceVersion)) {
            return;
        }
        this.lock.lock();
        try {
            this.lastResource = resourceVersion;
        }
        finally {
            this.lock.unlock();
        }
    }

    private boolean isHigher(String current, String resourceVersion) {
        try {
            return Long.parseLong(current) > Long.parseLong(resourceVersion);
        }
        catch (NumberFormatException nfe) {
            return false;
        }
    }

    private void watch(URI uri) {
        String usedLastResource = this.lastResource;
        URI watchUri = URI.create(uri.toASCIIString() + "?watch=true" + (this.configuration.useBookmarks() ? "&allowWatchBookmarks=true" : "") + (String)(usedLastResource != null ? "&resourceVersion=" + usedLastResource : ""));
        this.logger.info(() -> "Starting to watch '" + String.valueOf(watchUri) + "'");
        this.get(watchUri, HttpResponse.BodyHandlers.fromLineSubscriber((Flow.Subscriber<? super String>)new Flow.Subscriber<String>(){
            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                OperatorRuntime.this.logger.info(() -> "Starting to watch resources");
                this.subscription.request(1L);
            }

            @Override
            public void onNext(String item) {
                try {
                    OperatorRuntime.super.onEvent(item);
                }
                finally {
                    this.subscription.request(1L);
                }
            }

            @Override
            public void onError(Throwable throwable) {
                OperatorRuntime.this.logger.log(Level.SEVERE, throwable, throwable::getMessage);
            }

            @Override
            public void onComplete() {
            }
        })).exceptionally(e -> {
            this.logger.log(Level.SEVERE, (Throwable)e, () -> "Can't watch events: " + e.getMessage());
            if (!this.stopping) {
                this.logger.info(() -> "Re-watching events after a failure");
                this.watch(uri);
            } else {
                this.logger.info(() -> "Application is shutting down, exiting");
            }
            return null;
        });
    }

    private CompletionStage<List<T>> findAll(URI findAllUri) {
        return this.get(findAllUri, HttpResponse.BodyHandlers.ofString()).thenApply(res -> {
            Map meta;
            Map simpleModel = (Map)this.jsonMapper.fromString(Object.class, res);
            if ("Status".equals(simpleModel.getOrDefault("kind", ""))) {
                if (200 != Optional.ofNullable(simpleModel.get("code")).filter(Number.class::isInstance).map(Number.class::cast).map(Number::intValue).orElse(200)) {
                    throw new IllegalStateException("Can't find all items: " + res);
                }
            }
            if ((meta = (Map)simpleModel.get("metadata")) == null) {
                return null;
            }
            this.onBookmark(Optional.ofNullable(meta.get("resourceVersion")).map(String::valueOf).orElse(null));
            return Optional.ofNullable(simpleModel.get("items")).filter(Collection.class::isInstance).map(it -> ((Collection)it).stream().map(i -> (ObjectLike)this.jsonMapper.fromString(this.spec.resourceType(), this.jsonMapper.toString(i))).toList()).orElse(List.of());
        });
    }

    private <O> CompletionStage<O> get(URI findAllUri, HttpResponse.BodyHandler<O> handler) {
        return this.kubernetes.sendAsync(HttpRequest.newBuilder().GET().uri(findAllUri).header("accept", "application/json").build(), handler).thenApplyAsync(res -> {
            if (res.statusCode() != 200) {
                throw new IllegalStateException("Invalid response: " + String.valueOf(res) + "\n" + String.valueOf(res.body()));
            }
            return res.body();
        }, (Executor)this.threads);
    }

    public void await() {
        if (!this.configuration.await()) {
            return;
        }
        try {
            this.awaiter.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

