package io.yupiik.kubernetes.operator.base.impl;

import io.yupiik.fusion.framework.api.main.Awaiter;
import io.yupiik.fusion.json.JsonMapper;
import io.yupiik.fusion.kubernetes.client.KubernetesClient;
import io.yupiik.kubernetes.operator.base.impl.ObjectLike;
import io.yupiik.kubernetes.operator.base.spi.Operator;
import java.net.URI;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:io/yupiik/kubernetes/operator/base/impl/OperatorRuntime.class */
public class OperatorRuntime<T extends ObjectLike> extends SimpleController<T> implements Awaiter {
    private final Logger logger;
    private final OperatorConfiguration configuration;
    private final KubernetesClient kubernetes;
    private final ExecutorService threads;
    private final JsonMapper jsonMapper;
    private final Operator<T> spec;
    private final List<URI> findAllUris;
    private final ReentrantLock lock;
    private final CountDownLatch awaiter;
    private String lastResource;

    public OperatorRuntime(OperatorConfiguration operatorConfiguration, KubernetesClient kubernetesClient, ScheduledExecutorService scheduledExecutorService, JsonMapper jsonMapper, Operator<T> operator) {
        super(scheduledExecutorService, jsonMapper, operatorConfiguration.eventThreadCount(), operator);
        this.logger = Logger.getLogger(getClass().getName());
        this.lock = new ReentrantLock();
        this.awaiter = new CountDownLatch(1);
        this.configuration = operatorConfiguration;
        this.kubernetes = kubernetesClient;
        this.threads = scheduledExecutorService;
        this.jsonMapper = jsonMapper;
        this.spec = operator;
        this.findAllUris = operator.namespaces().stream().map(str -> {
            return URI.create("https://kubernetes.api/apis/" + operator.apiVersion() + "/" + (operator.namespaced() ? "namespaces/" + str + "/" : "") + operator.pluralName());
        }).toList();
    }

    public CompletionStage<?> doStart() {
        return this.spec.onStart().thenRunAsync(this::startListening, this.threads);
    }

    public void doStop() {
        try {
            super.stop();
            this.spec.onStop();
        } finally {
            if (this.awaiter.getCount() > 0) {
                this.awaiter.countDown();
            }
        }
    }

    private void startListening() {
        if (this.stopping) {
            return;
        }
        this.findAllUris.forEach(uri -> {
            findAll(uri).thenRunAsync(this::init, this.threads).thenRunAsync(() -> {
                watch(uri);
            }, this.threads).exceptionally(th -> {
                this.logger.log(Level.SEVERE, th, () -> {
                    return "Can't watch events: " + th.getMessage() + ", exiting";
                });
                this.awaiter.countDown();
                if (th instanceof RuntimeException) {
                    throw ((RuntimeException) th);
                }
                throw new IllegalStateException(th);
            });
        });
    }

    @Override // io.yupiik.kubernetes.operator.base.impl.SimpleController
    protected void onBookmark(String str) {
        String str2 = this.lastResource;
        if (str2 == null || !isHigher(str2, str)) {
            this.lock.lock();
            try {
                this.lastResource = str;
            } finally {
                this.lock.unlock();
            }
        }
    }

    private boolean isHigher(String str, String str2) {
        try {
            return Long.parseLong(str) > Long.parseLong(str2);
        } catch (NumberFormatException e) {
            return false;
        }
    }

    private void watch(URI uri) {
        String str = this.lastResource;
        URI create = URI.create(uri.toASCIIString() + "?watch=true" + (this.configuration.useBookmarks() ? "&allowWatchBookmarks=true" : "") + (str != null ? "&resourceVersion=" + str : ""));
        this.logger.info(() -> {
            return "Starting to watch '" + String.valueOf(create) + "'";
        });
        get(create, HttpResponse.BodyHandlers.fromLineSubscriber(new Flow.Subscriber<String>() { // from class: io.yupiik.kubernetes.operator.base.impl.OperatorRuntime.1
            private Flow.Subscription subscription;

            @Override // java.util.concurrent.Flow.Subscriber
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                OperatorRuntime.this.logger.info(() -> {
                    return "Starting to watch resources";
                });
                this.subscription.request(1L);
            }

            @Override // java.util.concurrent.Flow.Subscriber
            public void onNext(String str2) {
                try {
                    OperatorRuntime.super.onEvent(str2);
                } finally {
                    this.subscription.request(1L);
                }
            }

            @Override // java.util.concurrent.Flow.Subscriber
            public void onError(Throwable th) {
                Logger logger = OperatorRuntime.this.logger;
                Level level = Level.SEVERE;
                Objects.requireNonNull(th);
                logger.log(level, th, th::getMessage);
            }

            @Override // java.util.concurrent.Flow.Subscriber
            public void onComplete() {
            }
        })).exceptionally(th -> {
            this.logger.log(Level.SEVERE, th, () -> {
                return "Can't watch events: " + th.getMessage();
            });
            if (this.stopping) {
                this.logger.info(() -> {
                    return "Application is shutting down, exiting";
                });
                return null;
            }
            this.logger.info(() -> {
                return "Re-watching events after a failure";
            });
            watch(uri);
            return null;
        });
    }

    private CompletionStage<List<T>> findAll(URI uri) {
        return get(uri, HttpResponse.BodyHandlers.ofString()).thenApply(str -> {
            Map map = (Map) this.jsonMapper.fromString(Object.class, str);
            if ("Status".equals(map.getOrDefault("kind", ""))) {
                Optional ofNullable = Optional.ofNullable(map.get("code"));
                Class<Number> cls = Number.class;
                Objects.requireNonNull(Number.class);
                Optional filter = ofNullable.filter(cls::isInstance);
                Class<Number> cls2 = Number.class;
                Objects.requireNonNull(Number.class);
                if (200 != ((Integer) filter.map(cls2::cast).map((v0) -> {
                    return v0.intValue();
                }).orElse(200)).intValue()) {
                    throw new IllegalStateException("Can't find all items: " + str);
                }
            }
            Map map2 = (Map) map.get("metadata");
            if (map2 == null) {
                return null;
            }
            onBookmark((String) Optional.ofNullable(map2.get("resourceVersion")).map(String::valueOf).orElse(null));
            Optional ofNullable2 = Optional.ofNullable(map.get("items"));
            Class<Collection> cls3 = Collection.class;
            Objects.requireNonNull(Collection.class);
            return (List) ofNullable2.filter(cls3::isInstance).map(obj -> {
                return ((Collection) obj).stream().map(obj -> {
                    return (ObjectLike) this.jsonMapper.fromString(this.spec.resourceType(), this.jsonMapper.toString(obj));
                }).toList();
            }).orElse(List.of());
        });
    }

    private <O> CompletionStage<O> get(URI uri, HttpResponse.BodyHandler<O> bodyHandler) {
        return this.kubernetes.sendAsync(HttpRequest.newBuilder().GET().uri(uri).header("accept", "application/json").build(), bodyHandler).thenApplyAsync(httpResponse -> {
            if (httpResponse.statusCode() != 200) {
                throw new IllegalStateException("Invalid response: " + String.valueOf(httpResponse) + "\n" + String.valueOf(httpResponse.body()));
            }
            return httpResponse.body();
        }, (Executor) this.threads);
    }

    public void await() {
        if (this.configuration.await()) {
            try {
                this.awaiter.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
