/*
 * Decompiled with CFR 0.152.
 */
package io.syndesis.server.logging.jsondb.controller;

import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.syndesis.common.util.DurationConverter;
import io.syndesis.common.util.Json;
import io.syndesis.common.util.KeyGenerator;
import io.syndesis.server.jsondb.GetOptions;
import io.syndesis.server.jsondb.JsonDB;
import io.syndesis.server.logging.jsondb.controller.BatchOperation;
import io.syndesis.server.logging.jsondb.controller.KubernetesSupport;
import io.syndesis.server.logging.jsondb.controller.PodLogMonitor;
import io.syndesis.server.logging.jsondb.controller.PodLogState;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.skife.jdbi.v2.DBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;

@Service
@ConditionalOnProperty(value={"controllers.dblogging.enabled"}, havingValue="true", matchIfMissing=true)
public class ActivityTrackingController
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(ActivityTrackingController.class);
    private final DBI dbi;
    private final KubernetesClient client;
    private final Map<String, PodLogMonitor> podHandlers = new HashMap<String, PodLogMonitor>();
    private final JsonDB jsonDB;
    private final KubernetesSupport kubernetesSupport;
    private ScheduledExecutorService scheduler;
    protected ExecutorService executor;
    protected final LinkedBlockingDeque<BatchOperation> eventQueue = new LinkedBlockingDeque(1000);
    protected final AtomicBoolean stopped = new AtomicBoolean();
    private Duration retention = Duration.ofDays(1L);
    private Duration cleanUpInterval = Duration.ofMinutes(15L);
    private Duration startupDelay = Duration.ofSeconds(15L);

    @Autowired
    public ActivityTrackingController(JsonDB jsonDB, DBI dbi, KubernetesClient client) {
        this.jsonDB = jsonDB;
        this.dbi = dbi;
        this.client = client;
        this.kubernetesSupport = new KubernetesSupport(client);
    }

    @PostConstruct
    public void open() {
        this.scheduler = Executors.newScheduledThreadPool(1, ActivityTrackingController.threadFactory("Logs Controller Scheduler"));
        this.executor = Executors.newCachedThreadPool(ActivityTrackingController.threadFactory("Logs Controller"));
        this.executor.execute(this::processEventQueue);
        this.scheduler.scheduleWithFixedDelay(() -> this.executor.execute(this::pollPods), this.startupDelay.getSeconds(), 5L, TimeUnit.SECONDS);
        this.scheduler.scheduleWithFixedDelay(() -> this.executor.execute(this::cleanupLogs), this.startupDelay.toMillis(), this.cleanUpInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    public void cleanupLogs() {
        try {
            LOG.info("Purging old controller");
            long until = System.currentTimeMillis() - this.retention.toMillis();
            String untilKey = KeyGenerator.recreateKey((long)until, (int)0, (long)0L);
            Map hashMap = this.dbGet(HashMap.class, "/activity/integrations");
            if (hashMap != null) {
                for (String integrationId : hashMap.keySet()) {
                    String integrationPath = "/activity/exchanges/" + integrationId + "/";
                    int count = this.deleteFieldsLT(integrationPath, untilKey);
                    LOG.info("deleted {} transactions for integration: {}", (Object)count, (Object)integrationId);
                }
            }
        }
        catch (Exception e) {
            LOG.error("Unexpected Error occurred.", (Throwable)e);
        }
    }

    private int deleteFieldsLT(String path, String field) {
        return (Integer)this.dbi.inTransaction((conn, status) -> {
            StringBuilder sql = new StringBuilder("DELETE from jsondb where path LIKE ? and path < ?");
            return conn.update(sql.toString(), new Object[]{path + "%", path + field});
        });
    }

    @Override
    @PreDestroy
    public void close() {
        this.stopped.set(true);
        this.scheduler.shutdownNow();
        this.executor.shutdown();
    }

    private static ThreadFactory threadFactory(String name) {
        return r -> new Thread(null, r, name, 1024L);
    }

    private void pollPods() {
        try {
            for (PodLogMonitor podLogMonitor : this.podHandlers.values()) {
                podLogMonitor.markInOpenshift.set(false);
            }
            PodList podList = this.listPods();
            for (Pod pod : podList.getItems()) {
                if (!"Running".equals(pod.getStatus().getPhase())) continue;
                String name = pod.getMetadata().getName();
                PodLogMonitor handler = this.podHandlers.get(name);
                if (handler == null) {
                    try {
                        handler = new PodLogMonitor(this, pod);
                        handler.start();
                        LOG.info("Created handler for pod: {}", (Object)handler.podName);
                        this.podHandlers.put(name, handler);
                    }
                    catch (IOException e) {
                        LOG.error("Unexpected Error", (Throwable)e);
                    }
                    continue;
                }
                handler.markInOpenshift.set(true);
            }
            Iterator<Map.Entry<String, PodLogMonitor>> iterator = this.podHandlers.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, PodLogMonitor> next = iterator.next();
                if (next.getValue().markInOpenshift.get()) continue;
                LOG.info("Pod not tracked by openshift anymore: {}", (Object)next.getValue().podName);
                next.getValue().keepTrying.set(false);
                iterator.remove();
            }
            Map pods = this.dbGet(HashMap.class, "/activity/pods");
            if (pods != null) {
                pods.keySet().removeAll(this.podHandlers.keySet());
                for (String o : pods.keySet()) {
                    this.jsonDB.delete("/activity/pods/" + o);
                    LOG.info("Pod state removed from db: {}", (Object)o);
                }
            }
        }
        catch (IOException | RuntimeException e) {
            LOG.error("Unexpected Error occurred.", (Throwable)e);
        }
    }

    protected PodList listPods() {
        return (PodList)((FilterWatchListDeletable)this.client.pods().withLabel("component", "integration")).list();
    }

    protected boolean isPodRunning(String name) {
        Pod pod = (Pod)((PodResource)this.client.pods().withName(name)).get();
        if (pod == null) {
            return false;
        }
        return "Running".equals(pod.getStatus().getPhase());
    }

    protected void watchLog(String podName, Consumer<InputStream> handler, String sinceTime) throws IOException {
        this.kubernetesSupport.watchLog(podName, handler, sinceTime, this.executor);
    }

    public void deletePodLogState(String podName) throws IOException {
        this.jsonDB.delete("/activity/pods/" + podName);
    }

    public void setPodLogState(String podName, PodLogState state) throws IOException {
        this.jsonDB.set("/activity/pods/" + podName, Json.writer().writeValueAsBytes((Object)state));
    }

    public PodLogState getPodLogState(String podName) throws IOException {
        return this.dbGet(PodLogState.class, "/activity/pods/" + podName);
    }

    private <T> T dbGet(Class<T> type, String path) throws IOException {
        return this.dbGet(type, path, null);
    }

    private <T> T dbGet(Class<T> type, String path, GetOptions options) throws IOException {
        byte[] data = this.jsonDB.getAsByteArray(path, options);
        if (data == null) {
            return null;
        }
        return (T)Json.reader().forType(type).readValue(data);
    }

    private void processEventQueue() {
        try {
            LOG.info("Batch ingestion work thread started.");
            while (!this.stopped.get()) {
                BatchOperation event = this.eventQueue.pollFirst(1L, TimeUnit.SECONDS);
                if (event == null) continue;
                HashMap<String, Object> batch = new HashMap<String, Object>();
                long batchStartTime = System.currentTimeMillis();
                int eventCounter = 0;
                try {
                    while (!this.stopped.get() && event != null) {
                        ++eventCounter;
                        event.apply(batch);
                        long remaining = 1000L - (System.currentTimeMillis() - batchStartTime);
                        if (batch.size() >= 1000 || remaining <= 0L) {
                            event = null;
                            continue;
                        }
                        event = this.eventQueue.poll(remaining, TimeUnit.MILLISECONDS);
                    }
                    this.jsonDB.update("/activity", Json.writer().writeValueAsBytes(batch));
                    LOG.debug("Batch ingested {} log events", (Object)eventCounter);
                }
                catch (IOException e) {
                    LOG.error("Unexpected Error", (Throwable)e);
                }
            }
        }
        catch (InterruptedException e) {
            LOG.error("Interrupted", (Throwable)e);
        }
        LOG.info("Batch ingestion work thread done.");
    }

    protected void schedule(Runnable command, long delay, TimeUnit unit) {
        this.scheduler.schedule(() -> this.executor.execute(command), delay, unit);
    }

    @Value(value="${controllers.dblogging.retention:1 day}")
    public void setRetention(String retention) {
        this.retention = new DurationConverter().convert(retention);
    }

    @Value(value="${controllers.dblogging.cleanUpPeriod:15 minutes}")
    public void setCleanUpInterval(String cleanUpInterval) {
        this.cleanUpInterval = new DurationConverter().convert(cleanUpInterval);
    }

    @Value(value="${controllers.dblogging.startupDelay:15 seconds}")
    public void setStartupDelay(String startupDelay) {
        this.startupDelay = new DurationConverter().convert(startupDelay);
    }
}

