/*
 * Decompiled with CFR 0.152.
 */
package io.syndesis.server.logging.jsondb.controller;

import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.syndesis.common.util.DurationConverter;
import io.syndesis.common.util.Json;
import io.syndesis.common.util.backend.BackendController;
import io.syndesis.server.jsondb.GetOptions;
import io.syndesis.server.jsondb.JsonDB;
import io.syndesis.server.jsondb.impl.SqlJsonDB;
import io.syndesis.server.logging.jsondb.controller.BatchOperation;
import io.syndesis.server.logging.jsondb.controller.KubernetesSupport;
import io.syndesis.server.logging.jsondb.controller.PodLogMonitor;
import io.syndesis.server.logging.jsondb.controller.PodLogState;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.PreparedBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;

@Service
@ConditionalOnProperty(value={"controllers.dblogging.enabled"}, havingValue="true", matchIfMissing=true)
public class ActivityTrackingController
implements BackendController,
Closeable {
    static final String IDLE_THREAD_NAME = "Logs Controller [idle]";
    private static final Logger LOG = LoggerFactory.getLogger(ActivityTrackingController.class);
    private final DBI dbi;
    private final KubernetesClient client;
    private final Map<String, PodLogMonitor> podHandlers = new ConcurrentHashMap<String, PodLogMonitor>();
    private final JsonDB jsondb;
    private ScheduledExecutorService scheduler;
    private ExecutorService executor;
    final KubernetesSupport kubernetesSupport;
    protected final LinkedBlockingDeque<BatchOperation> eventQueue = new LinkedBlockingDeque(1000);
    protected final AtomicBoolean stopped = new AtomicBoolean();
    private int retention = 50;
    private Duration retentionTime = Duration.ofDays(1L);
    private Duration cleanUpInterval = Duration.ofMinutes(15L);
    private Duration startupDelay = Duration.ofSeconds(15L);
    private SqlJsonDB.DatabaseKind databaseKind;

    @Autowired
    public ActivityTrackingController(JsonDB jsondb, DBI dbi, KubernetesClient client) {
        this.jsondb = jsondb;
        this.dbi = dbi;
        this.client = client;
        this.kubernetesSupport = new KubernetesSupport(client);
    }

    public void start() {
        this.open();
    }

    @PostConstruct
    public void open() {
        this.scheduler = Executors.newScheduledThreadPool(1, ActivityTrackingController.threadFactory("Logs Controller Scheduler"));
        this.executor = Executors.newCachedThreadPool(ActivityTrackingController.threadFactory("Logs Controller"));
        this.stopped.set(false);
        this.executor.execute(this::processEventQueue);
        this.scheduler.scheduleWithFixedDelay(this::pollPods, this.startupDelay.getSeconds(), 5L, TimeUnit.SECONDS);
        this.scheduler.scheduleWithFixedDelay(this::cleanupLogs, this.startupDelay.toMillis(), this.cleanUpInterval.toMillis(), TimeUnit.MILLISECONDS);
        this.dbi.inTransaction((x, status) -> {
            try {
                String version;
                String dbName = x.getConnection().getMetaData().getDatabaseProductName();
                this.databaseKind = SqlJsonDB.DatabaseKind.valueOf((String)dbName);
                if (this.databaseKind == SqlJsonDB.DatabaseKind.PostgreSQL && (version = (String)x.createQuery("SELECT VERSION()").mapTo(String.class).first()).startsWith("CockroachDB")) {
                    this.databaseKind = SqlJsonDB.DatabaseKind.CockroachDB;
                }
                return null;
            }
            catch (Exception e) {
                throw new IllegalStateException("Could not determine the database type", e);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cleanupLogs() {
        Thread.currentThread().setName("Logs Controller Scheduler [running]: cleanupLogs");
        try {
            LOG.info("Purging old activity logs");
            Map hashMap = this.dbGet(HashMap.class, "/activity/integrations");
            if (hashMap != null) {
                for (String integrationId : hashMap.keySet()) {
                    String integrationPath = "/activity/exchanges/" + integrationId + "/%";
                    int count = this.deleteKeepingRetention(integrationPath);
                    LOG.info("deleted {} transactions for integration: {}", (Object)count, (Object)integrationId);
                }
            }
        }
        catch (Exception e) {
            LOG.error("Unexpected Error occurred.", (Throwable)e);
        }
        finally {
            Thread.currentThread().setName("Logs Controller Scheduler [idle]");
        }
    }

    int deleteKeepingRetention(String path) {
        return (Integer)this.dbi.inTransaction((conn, status) -> {
            String sql = "DELETE FROM jsondb WHERE path LIKE ? AND path NOT IN (SELECT path FROM jsondb WHERE path LIKE ? ORDER BY path DESC FETCH FIRST (?) ROWS ONLY)";
            return conn.update("DELETE FROM jsondb WHERE path LIKE ? AND path NOT IN (SELECT path FROM jsondb WHERE path LIKE ? ORDER BY path DESC FETCH FIRST (?) ROWS ONLY)", new Object[]{path, path, this.retention});
        });
    }

    private void writeBatch(Map<String, Object> batch) {
        this.dbi.inTransaction((conn, status) -> {
            String sql = this.databaseKind == SqlJsonDB.DatabaseKind.PostgreSQL ? "INSERT into jsondb (path, value, ovalue) values (:path, :value, :ovalue) ON CONFLICT (path) DO UPDATE SET value = :value, ovalue = :ovalue" : (this.databaseKind == SqlJsonDB.DatabaseKind.H2 ? "MERGE INTO jsondb (path, value, ovalue) VALUES (:path, :value, :ovalue)" : "INSERT into jsondb (path, value, ovalue) values (:path, :value, :ovalue)");
            PreparedBatch insert = conn.prepareBatch(sql);
            for (Map.Entry entry : batch.entrySet()) {
                String key = "/activity" + (String)entry.getKey() + "/";
                String value = null;
                String ovalue = null;
                if (key.startsWith("/activity/exchanges")) {
                    value = '`' + (String)entry.getValue();
                } else if (key.startsWith("/activity/integrations")) {
                    ovalue = "true";
                    value = String.valueOf('\u0002');
                } else if (key.startsWith("/activity/pods")) {
                    PodLogState p = (PodLogState)entry.getValue();
                    key = key + "time/";
                    value = '`' + p.time;
                }
                ((PreparedBatch)((PreparedBatch)((PreparedBatch)insert.bind("path", key)).bind("value", value)).bind("ovalue", ovalue)).add();
            }
            return insert.execute();
        });
    }

    public void stop() {
        this.close();
    }

    @Override
    @PreDestroy
    public void close() {
        if (this.stopped.getAndSet(true)) {
            return;
        }
        this.kubernetesSupport.cancelAllRequests();
        this.scheduler.shutdownNow();
        this.executor.shutdown();
        try {
            boolean schedulerStopped = false;
            boolean executorStopped = false;
            do {
                schedulerStopped = this.scheduler.awaitTermination(10L, TimeUnit.SECONDS);
                executorStopped = this.executor.awaitTermination(10L, TimeUnit.SECONDS);
            } while (!schedulerStopped && !executorStopped);
            this.scheduler = null;
            this.executor = null;
        }
        catch (InterruptedException e) {
            LOG.warn("Unable to cleanly stop: {}", (Object)e.getMessage());
            LOG.debug("Interrupted while stopping", (Throwable)e);
        }
    }

    private static ThreadFactory threadFactory(String name) {
        return r -> {
            Thread thread = new Thread(null, r, name, 1024L);
            thread.setUncaughtExceptionHandler((where, throwable) -> LOG.error("Failure running activity tracking task on thread: {}", (Object)where.getName(), (Object)throwable));
            return thread;
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void pollPods() {
        Thread.currentThread().setName("Logs Controller Scheduler [running]: pollPods");
        try {
            for (PodLogMonitor podLogMonitor : this.podHandlers.values()) {
                podLogMonitor.markInOpenshift.set(false);
            }
            PodList podList = this.listPods();
            for (Pod pod : podList.getItems()) {
                if (!"Running".equals(pod.getStatus().getPhase())) continue;
                String name = pod.getMetadata().getName();
                PodLogMonitor handler = this.podHandlers.computeIfAbsent(name, n -> this.createLogMonitor(pod));
                try {
                    handler.markInOpenshift.set(true);
                    handler.start();
                }
                catch (IOException e) {
                    LOG.error("Unexpected Error", (Throwable)e);
                }
            }
            Iterator<Map.Entry<String, PodLogMonitor>> iterator = this.podHandlers.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, PodLogMonitor> next = iterator.next();
                if (next.getValue().markInOpenshift.get()) continue;
                LOG.info("Pod not tracked by openshift anymore: {}", (Object)next.getValue().podName);
                next.getValue().keepTrying.set(false);
                iterator.remove();
            }
            Map pods = this.dbGet(HashMap.class, "/activity/pods");
            if (pods != null) {
                pods.keySet().removeAll(this.podHandlers.keySet());
                for (String o : pods.keySet()) {
                    this.jsondb.delete("/activity/pods/" + o);
                    LOG.info("Pod state removed from db: {}", (Object)o);
                }
            }
        }
        catch (IOException | RuntimeException e) {
            LOG.error("Unexpected Error occurred.", (Throwable)e);
        }
        finally {
            Thread.currentThread().setName("Logs Controller Scheduler [idle]");
        }
    }

    protected PodLogMonitor createLogMonitor(Pod pod) {
        PodLogMonitor monitor = new PodLogMonitor(this, pod);
        LOG.info("Created log monitor for pod: {}", (Object)monitor.podName);
        return monitor;
    }

    protected PodList listPods() {
        return (PodList)((FilterWatchListDeletable)this.client.pods().withLabel("syndesis.io/component", "integration")).list();
    }

    protected boolean isPodRunning(String name) {
        Pod pod = (Pod)((PodResource)this.client.pods().withName(name)).get();
        if (pod == null) {
            return false;
        }
        return "Running".equals(pod.getStatus().getPhase());
    }

    protected void watchLog(String podName, Consumer<InputStream> handler, String sinceTime) throws IOException {
        this.kubernetesSupport.watchLog(podName, handler, sinceTime, this.executor);
    }

    public void deletePodLogState(String podName) {
        this.jsondb.delete("/activity/pods/" + podName);
    }

    public void setPodLogState(String podName, PodLogState state) throws IOException {
        this.jsondb.set("/activity/pods/" + podName, Json.writer().writeValueAsBytes((Object)state));
    }

    public PodLogState getPodLogState(String podName) throws IOException {
        return this.dbGet(PodLogState.class, "/activity/pods/" + podName);
    }

    private <T> T dbGet(Class<T> type, String path) throws IOException {
        return this.dbGet(type, path, null);
    }

    private <T> T dbGet(Class<T> type, String path, GetOptions options) throws IOException {
        byte[] data = this.jsondb.getAsByteArray(path, options);
        if (data == null) {
            return null;
        }
        return (T)Json.reader().forType(type).readValue(data);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processEventQueue() {
        Thread.currentThread().setName("Logs Controller [running]: processEventQueue");
        try {
            LOG.info("Batch ingestion work thread started.");
            while (!this.stopped.get()) {
                BatchOperation event = this.eventQueue.pollFirst(1L, TimeUnit.SECONDS);
                if (event == null) continue;
                TreeMap<String, Object> batch = new TreeMap<String, Object>();
                long batchStartTime = System.currentTimeMillis();
                int eventCounter = 0;
                try {
                    while (!this.stopped.get() && event != null) {
                        ++eventCounter;
                        event.apply(batch);
                        long remaining = 1000L - (System.currentTimeMillis() - batchStartTime);
                        if (batch.size() >= 1000 || remaining <= 0L) {
                            event = null;
                            continue;
                        }
                        event = this.eventQueue.poll(remaining, TimeUnit.MILLISECONDS);
                    }
                    try {
                        this.writeBatch(batch);
                    }
                    catch (RuntimeException e) {
                        LOG.warn("Unable to write batch of events: {}", (Object)e.getMessage());
                        LOG.debug("Unable to write batch of events: ", (Throwable)e);
                    }
                    LOG.debug("Batch ingested {} log events", (Object)eventCounter);
                }
                catch (IOException e) {
                    LOG.error("Unexpected Error", (Throwable)e);
                }
            }
        }
        catch (InterruptedException e) {
            LOG.error("Interrupted", (Throwable)e);
        }
        finally {
            Thread.currentThread().setName(IDLE_THREAD_NAME);
        }
        LOG.info("Batch ingestion work thread done.");
    }

    @Value(value="${controllers.dblogging.retention:50}")
    public void setRetention(int retention) {
        this.retention = retention;
    }

    @Value(value="${controllers.dblogging.retentionTime:1 day}")
    public void setRetentionTime(String retentionTime) {
        this.retentionTime = new DurationConverter().convert(retentionTime);
    }

    @Value(value="${controllers.dblogging.cleanUpPeriod:15 minutes}")
    public void setCleanUpInterval(String cleanUpInterval) {
        this.cleanUpInterval = new DurationConverter().convert(cleanUpInterval);
    }

    @Value(value="${controllers.dblogging.startupDelay:15 seconds}")
    public void setStartupDelay(String startupDelay) {
        this.startupDelay = new DurationConverter().convert(startupDelay);
    }

    public int getRetention() {
        return this.retention;
    }

    public Duration getRetentionTime() {
        return this.retentionTime;
    }

    void execute(String podName, Runnable task) {
        if (this.stopped.get()) {
            LOG.warn("Not executing task: {}, for pod {}, the activity tracking is stopping", (Object)task, (Object)podName);
            return;
        }
        this.executor.execute(() -> {
            Thread.currentThread().setName("Logs Controller [running], pod: " + podName);
            try {
                task.run();
            }
            finally {
                Thread.currentThread().setName(IDLE_THREAD_NAME);
            }
        });
    }
}

