/*
 * Decompiled with CFR 0.152.
 */
package io.syndesis.server.logging.jsondb.controller;

import com.fasterxml.jackson.databind.JsonNode;
import io.fabric8.kubernetes.api.model.Pod;
import io.syndesis.common.util.Json;
import io.syndesis.common.util.KeyGenerator;
import io.syndesis.server.endpoint.v1.handler.activity.Activity;
import io.syndesis.server.endpoint.v1.handler.activity.ActivityStep;
import io.syndesis.server.jsondb.JsonDBException;
import io.syndesis.server.jsondb.impl.JsonRecordSupport;
import io.syndesis.server.logging.jsondb.controller.ActivityTrackingController;
import io.syndesis.server.logging.jsondb.controller.PodLogState;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PodLogMonitor
implements Consumer<InputStream> {
    private static final Logger LOG = LoggerFactory.getLogger(ActivityTrackingController.class);
    private static final Pattern LOG_LINE_REGEX = Pattern.compile("^(\\d\\d\\d\\d\\-\\d\\d\\-\\d\\dT\\d\\d:\\d\\d:\\d\\d\\.\\d+Z) (\\{.*\\})\\s*");
    private AtomicBoolean running = new AtomicBoolean(false);
    private final ActivityTrackingController logsController;
    protected final AtomicBoolean markInOpenshift = new AtomicBoolean(true);
    protected final AtomicBoolean keepTrying = new AtomicBoolean(true);
    protected final String podName;
    protected final String integrationId;
    protected final String deploymentVersion;
    protected PodLogState state;
    protected HashMap<String, InflightData> inflightActivities = new HashMap();

    PodLogMonitor(ActivityTrackingController logsController, Pod pod) {
        this.logsController = logsController;
        this.podName = pod.getMetadata().getName();
        if (this.podName == null) {
            throw new IllegalStateException("Could not determine the pod name");
        }
        Map labels = pod.getMetadata().getLabels();
        this.integrationId = (String)labels.get("syndesis.io/integration-id");
        if (this.integrationId == null) {
            throw new IllegalStateException("Could not determine the integration id that is being run on the pod: " + this.podName);
        }
        this.deploymentVersion = (String)labels.get("syndesis.io/deployment-version");
        if (this.deploymentVersion == null) {
            throw new IllegalStateException("Could not determine the deployment version that is being run on the pod: " + this.podName);
        }
    }

    public void start() throws IOException {
        if (this.running.get()) {
            return;
        }
        this.state = this.logsController.getPodLogState(this.podName);
        if (this.state == null) {
            this.state = new PodLogState();
            this.logsController.setPodLogState(this.podName, this.state);
        }
        LOG.info("Recovered state: {}", (Object)this.state);
        this.logsController.execute(this.podName, this::run);
    }

    public void run() {
        if (this.logsController.stopped.get() || !this.keepTrying.get() || !this.logsController.isPodRunning(this.podName)) {
            return;
        }
        if (this.running.compareAndSet(false, true)) {
            LOG.info("Getting controller for pod: {}", (Object)this.podName);
            try {
                this.logsController.watchLog(this.podName, this, this.state.time);
            }
            catch (IOException e) {
                LOG.info("Failure occurred while processing controller for pod: {}", (Object)this.podName, (Object)e);
            }
        }
    }

    @Override
    public void accept(InputStream is) {
        try {
            if (is == null) {
                return;
            }
            this.processLogStream(is);
        }
        catch (EOFException | SocketTimeoutException e) {
            LOG.info("Streaming ended for pod {} due to: {}", (Object)this.podName, (Object)PodLogMonitor.message(e));
            LOG.debug("Streaming ended for pod {}", (Object)this.podName, (Object)e);
        }
        catch (IOException | InterruptedException e) {
            LOG.info("Failure occurred while processing controller for pod: {}", (Object)this.podName, (Object)e);
        }
        finally {
            this.running.set(false);
            IOUtils.closeQuietly((InputStream)is);
        }
    }

    void processLogStream(InputStream is) throws IOException, InterruptedException {
        int c;
        ByteArrayOutputStream line = new ByteArrayOutputStream();
        while (!this.logsController.stopped.get() && (c = is.read()) >= 0) {
            line.write(c);
            if (c == 10) {
                this.processLine(new String(line.toByteArray(), StandardCharsets.UTF_8));
                line.reset();
            }
            if (line.size() <= 10240) continue;
            line.reset();
        }
        if (!this.logsController.stopped.get()) {
            if (this.logsController.isPodRunning(this.podName)) {
                LOG.info("End of Log stream for running pod: {}", (Object)this.podName);
            } else {
                LOG.info("End of Log stream for terminated pod: {}", (Object)this.podName);
            }
        }
    }

    InflightData getInflightData(String exchangeId, String logts) throws IOException {
        InflightData data = this.inflightActivities.get(exchangeId);
        if (data == null) {
            data = new InflightData();
            data.activity.setPod(this.podName);
            data.activity.setVer(this.deploymentVersion);
            data.activity.setId(exchangeId);
            data.activity.setAt(Long.valueOf(KeyGenerator.getKeyTimeMillis((String)exchangeId)));
            data.activity.setLogts(logts);
            this.inflightActivities.put(exchangeId, data);
        }
        return data;
    }

    void processLine(String line) throws IOException {
        Matcher matcher = LOG_LINE_REGEX.matcher(line);
        if (!matcher.matches()) {
            return;
        }
        String time = matcher.group(1);
        String data = matcher.group(2);
        try {
            Map json = (Map)Json.reader().forType(HashMap.class).readValue(data);
            String exchange = PodLogMonitor.validate((String)json.remove("exchange"));
            long keyTimeMillis = KeyGenerator.getKeyTimeMillis((String)exchange);
            long until = this.now() - this.logsController.getRetentionTime().toMillis();
            if (keyTimeMillis < until) {
                return;
            }
            String id = PodLogMonitor.validate((String)json.remove("id"));
            InflightData inflightData = this.getInflightData(exchange, time);
            String step = (String)json.remove("step");
            if (step == null) {
                Boolean failed = (Boolean)json.remove("failed");
                if (failed != null) {
                    inflightData.activity.setFailed(failed);
                }
                String status = (String)json.remove("status");
                inflightData.metadata.putAll(json);
                if (status != null) {
                    inflightData.activity.setStatus(status);
                    if ("done".equals(status)) {
                        inflightData.activity.setSteps(inflightData.doneSteps);
                        if (!inflightData.metadata.isEmpty()) {
                            inflightData.activity.setMetadata(PodLogMonitor.toJsonNode(inflightData.metadata));
                        }
                        String activityAsString = Json.writer().writeValueAsString((Object)inflightData.activity);
                        String transactionPath = String.format("/exchanges/%s/%s", this.integrationId, exchange);
                        this.inflightActivities.remove(exchange);
                        this.logsController.eventQueue.put(batch -> {
                            batch.put(transactionPath, activityAsString);
                            this.trackState(time, batch);
                        });
                    }
                }
            } else {
                Number duration;
                String failure;
                ActivityStep as = inflightData.getStep(step, id);
                String message = (String)json.remove("message");
                if (message != null) {
                    if (as.getMessages() == null) {
                        as.setMessages(new ArrayList());
                    }
                    as.getMessages().add(message);
                }
                if ((failure = (String)json.remove("failure")) != null) {
                    as.setFailure(failure);
                }
                if ((duration = (Number)json.remove("duration")) != null) {
                    as.setDuration(Long.valueOf(duration.longValue()));
                }
                if (!json.isEmpty()) {
                    if (as.getEvents() == null) {
                        as.setEvents(new ArrayList());
                    }
                    as.getEvents().add(PodLogMonitor.toJsonNode(json));
                }
                if (duration != null) {
                    inflightData.activeSteps.remove(step);
                    if (inflightData.doneSteps.size() == 50) {
                        ActivityStep truncated = new ActivityStep();
                        truncated.addMessage("Max activity tracking steps reached.  No further steps will be recorded.");
                        inflightData.doneSteps.add(truncated);
                    }
                    if (inflightData.doneSteps.size() < 50) {
                        inflightData.doneSteps.add(as);
                    }
                }
            }
        }
        catch (JsonDBException | IOException | ClassCastException json) {
        }
        catch (InterruptedException e) {
            InterruptedIOException rethrow = new InterruptedIOException(e.getMessage());
            rethrow.initCause(e);
            throw rethrow;
        }
    }

    long now() {
        return System.currentTimeMillis();
    }

    private static JsonNode toJsonNode(Map<String, Object> json) throws IOException {
        return Json.reader().readTree(Json.writer().writeValueAsString(json));
    }

    private void trackState(String time, Map<String, Object> batch) {
        this.state.time = time;
        String podStatPath = "/pods/" + this.podName;
        batch.put(podStatPath, this.state);
        batch.put("/integrations/" + this.integrationId, Boolean.TRUE);
    }

    private static String validate(String value) {
        if (value == null) {
            return null;
        }
        return JsonRecordSupport.validateKey((String)value);
    }

    private static String message(Throwable e) {
        StringBuilder buffy = new StringBuilder().append(e.getClass().getName()).append(": ").append(e.getMessage());
        if (e.getCause() != null && e.getCause() != e) {
            buffy.append(", caused by: ");
            buffy.append(PodLogMonitor.message(e.getCause()));
        }
        return buffy.toString();
    }

    private static class InflightData {
        Activity activity = new Activity();
        ArrayList<ActivityStep> doneSteps = new ArrayList();
        Map<String, ActivityStep> activeSteps = new LinkedHashMap<String, ActivityStep>();
        Map<String, Object> metadata = new HashMap<String, Object>();

        private InflightData() {
        }

        public ActivityStep getStep(String step, String id) throws IOException {
            ActivityStep rc = this.activeSteps.get(step);
            if (rc == null) {
                rc = new ActivityStep();
                rc.setId(step);
                rc.setAt(Long.valueOf(KeyGenerator.getKeyTimeMillis((String)id)));
                this.activeSteps.put(step, rc);
            }
            return rc;
        }
    }
}

