/*
 * Decompiled with CFR 0.152.
 */
package io.syndesis.server.logging.jsondb.controller;

import io.fabric8.kubernetes.api.model.Pod;
import io.syndesis.common.util.Json;
import io.syndesis.server.jsondb.JsonDBException;
import io.syndesis.server.jsondb.impl.JsonRecordSupport;
import io.syndesis.server.logging.jsondb.controller.ActivityTrackingController;
import io.syndesis.server.logging.jsondb.controller.PodLogState;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PodLogMonitor
implements Consumer<InputStream> {
    private static final Logger LOG = LoggerFactory.getLogger(ActivityTrackingController.class);
    private final ActivityTrackingController logsController;
    protected final AtomicBoolean markInOpenshift = new AtomicBoolean(true);
    protected final AtomicBoolean keepTrying = new AtomicBoolean(true);
    protected final String podName;
    protected final String integrationId;
    protected final String deploymentVersion;
    protected PodLogState state;

    public PodLogMonitor(ActivityTrackingController logsController, Pod pod) throws IOException {
        this.logsController = logsController;
        this.podName = pod.getMetadata().getName();
        if (this.podName == null) {
            throw new IOException("Could not determine the pod name");
        }
        Map labels = pod.getMetadata().getAnnotations();
        this.integrationId = (String)labels.get("syndesis.io/integration-id");
        if (this.integrationId == null) {
            throw new IOException("Could not determine the integration id that is being run on the pod: " + this.podName);
        }
        this.deploymentVersion = (String)labels.get("syndesis.io/deployment-version");
        if (this.deploymentVersion == null) {
            throw new IOException("Could not determine the deployment version that is being run on the pod: " + this.podName);
        }
    }

    public void start() throws IOException {
        this.state = this.logsController.getPodLogState(this.podName);
        if (this.state == null) {
            this.state = new PodLogState();
            this.logsController.setPodLogState(this.podName, this.state);
        }
        LOG.info("Recovered state: {}", (Object)this.state);
        this.logsController.executor.execute(this::run);
    }

    public void run() {
        if (this.logsController.stopped.get() || !this.keepTrying.get() || !this.logsController.isPodRunning(this.podName)) {
            return;
        }
        LOG.info("Getting controller for pod: {}", (Object)this.podName);
        try {
            this.logsController.watchLog(this.podName, this, this.state.time);
        }
        catch (IOException e) {
            LOG.info("Failure occurred while processing controller for pod: {}", (Object)this.podName, (Object)e);
            this.logsController.schedule(this::run, 5L, TimeUnit.SECONDS);
        }
    }

    @Override
    public void accept(InputStream is) {
        if (is != null) {
            try {
                try {
                    this.processLogStream(is);
                }
                finally {
                    is.close();
                }
            }
            catch (IOException | InterruptedException e) {
                LOG.info("Failure occurred while processing controller for pod: {}", (Object)this.podName, (Object)e);
                this.logsController.schedule(this::run, 5L, TimeUnit.SECONDS);
            }
        } else {
            this.logsController.schedule(this::run, 5L, TimeUnit.SECONDS);
        }
    }

    private void processLogStream(InputStream is) throws IOException, InterruptedException {
        int c;
        ByteArrayOutputStream line = new ByteArrayOutputStream();
        while (!this.logsController.stopped.get() && (c = is.read()) >= 0) {
            line.write(c);
            if (c == 10) {
                this.processLine(line.toByteArray());
                line.reset();
            }
            if (line.size() <= 4096) continue;
            line.reset();
        }
        if (!this.logsController.stopped.get()) {
            if (this.logsController.isPodRunning(this.podName)) {
                LOG.info("End of Log stream for running pod: {}", (Object)this.podName);
                this.logsController.schedule(this::run, 5L, TimeUnit.SECONDS);
            } else {
                LOG.info("End of Log stream for terminated pod: {}", (Object)this.podName);
            }
        }
    }

    private void processLine(byte[] line) throws IOException {
        if (line.length < 32 || line[30] != 32 || line[31] != 123) {
            return;
        }
        String time = new String(line, 0, 30, StandardCharsets.US_ASCII);
        try {
            Map json = (Map)Json.reader().forType(HashMap.class).readValue(line, 31, line.length - 31);
            String exchange = this.validate((String)json.get("exchange"));
            String id = this.validate((String)json.get("id"));
            String step = this.validate((String)json.get("step"));
            String status = this.validate((String)json.get("status"));
            if (id != null && exchange != null && step != null) {
                String stepPath = String.format("/exchanges/%s/%s/steps/%s/%s", this.integrationId, exchange, step, id);
                json.put("logts", time);
                json.remove("exchange");
                json.remove("id");
                json.remove("step");
                this.logsController.eventQueue.put(batch -> {
                    batch.put(stepPath, json);
                    this.trackState(time, batch);
                });
            } else if (exchange != null && status != null) {
                String transactionPath = String.format("/exchanges/%s/%s", this.integrationId, exchange);
                json.put("pod", this.podName);
                json.put("ver", this.deploymentVersion);
                if ("begin".equals(status)) {
                    json.put("logts", time);
                }
                json.remove("exchange");
                json.remove("steps");
                this.logsController.eventQueue.put(batch -> {
                    for (Map.Entry entry : json.entrySet()) {
                        batch.put(transactionPath + "/" + (String)entry.getKey(), entry.getValue());
                    }
                    this.trackState(time, batch);
                });
            }
        }
        catch (JsonDBException | IOException | ClassCastException json) {
        }
        catch (InterruptedException e) {
            InterruptedIOException rethrow = new InterruptedIOException(e.getMessage());
            rethrow.initCause(e);
            throw rethrow;
        }
    }

    private void trackState(String time, Map<String, Object> batch) {
        this.state.time = time;
        String podStatPath = "/pods/" + this.podName;
        batch.put(podStatPath, this.state);
        batch.put("/integrations/" + this.integrationId, Boolean.TRUE);
    }

    private String validate(String value) {
        if (value == null) {
            return null;
        }
        return JsonRecordSupport.validateKey((String)value);
    }
}

