package io.syndesis.server.logging.jsondb.controller;

import io.fabric8.kubernetes.api.model.Pod;
import io.syndesis.common.util.Json;
import io.syndesis.server.jsondb.JsonDBException;
import io.syndesis.server.jsondb.impl.JsonRecordSupport;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/syndesis/server/logging/jsondb/controller/PodLogMonitor.class */
public class PodLogMonitor implements Consumer<InputStream> {
    private static final Logger LOG = LoggerFactory.getLogger(ActivityTrackingController.class);
    private final ActivityTrackingController logsController;
    protected final AtomicBoolean markInOpenshift = new AtomicBoolean(true);
    protected final AtomicBoolean keepTrying = new AtomicBoolean(true);
    protected final String podName;
    protected final String integrationId;
    protected final String deploymentVersion;
    protected PodLogState state;

    public PodLogMonitor(ActivityTrackingController activityTrackingController, Pod pod) throws IOException {
        this.logsController = activityTrackingController;
        this.podName = pod.getMetadata().getName();
        if (this.podName == null) {
            throw new IOException("Could not determine the pod name");
        }
        Map annotations = pod.getMetadata().getAnnotations();
        this.integrationId = (String) annotations.get("syndesis.io/integration-id");
        if (this.integrationId == null) {
            throw new IOException("Could not determine the integration id that is being run on the pod: " + this.podName);
        }
        this.deploymentVersion = (String) annotations.get("syndesis.io/deployment-version");
        if (this.deploymentVersion == null) {
            throw new IOException("Could not determine the deployment version that is being run on the pod: " + this.podName);
        }
    }

    public void start() throws IOException {
        this.state = this.logsController.getPodLogState(this.podName);
        if (this.state == null) {
            this.state = new PodLogState();
            this.logsController.setPodLogState(this.podName, this.state);
        }
        LOG.info("Recovered state: {}", this.state);
        this.logsController.executor.execute(this::run);
    }

    public void run() {
        if (!this.logsController.stopped.get() && this.keepTrying.get() && this.logsController.isPodRunning(this.podName)) {
            LOG.info("Getting controller for pod: {}", this.podName);
            try {
                this.logsController.watchLog(this.podName, this, this.state.time);
            } catch (IOException e) {
                LOG.info("Failure occurred while processing controller for pod: {}", this.podName, e);
                this.logsController.schedule(this::run, 5L, TimeUnit.SECONDS);
            }
        }
    }

    @Override // java.util.function.Consumer
    public void accept(InputStream inputStream) {
        if (inputStream == null) {
            this.logsController.schedule(this::run, 5L, TimeUnit.SECONDS);
            return;
        }
        try {
            try {
                processLogStream(inputStream);
                inputStream.close();
            } catch (Throwable th) {
                inputStream.close();
                throw th;
            }
        } catch (IOException | InterruptedException e) {
            LOG.info("Failure occurred while processing controller for pod: {}", this.podName, e);
            this.logsController.schedule(this::run, 5L, TimeUnit.SECONDS);
        }
    }

    private void processLogStream(InputStream inputStream) throws IOException, InterruptedException {
        int read;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        while (!this.logsController.stopped.get() && (read = inputStream.read()) >= 0) {
            byteArrayOutputStream.write(read);
            if (read == 10) {
                processLine(byteArrayOutputStream.toByteArray());
                byteArrayOutputStream.reset();
            }
            if (byteArrayOutputStream.size() > 4096) {
                byteArrayOutputStream.reset();
            }
        }
        if (this.logsController.stopped.get()) {
            return;
        }
        if (!this.logsController.isPodRunning(this.podName)) {
            LOG.info("End of Log stream for terminated pod: {}", this.podName);
        } else {
            LOG.info("End of Log stream for running pod: {}", this.podName);
            this.logsController.schedule(this::run, 5L, TimeUnit.SECONDS);
        }
    }

    private void processLine(byte[] bArr) throws IOException {
        if (bArr.length >= 32 && bArr[30] == 32 && bArr[31] == 123) {
            String str = new String(bArr, 0, 30, StandardCharsets.US_ASCII);
            try {
                Map map = (Map) Json.reader().forType(HashMap.class).readValue(bArr, 31, bArr.length - 31);
                String validate = validate((String) map.get("exchange"));
                String validate2 = validate((String) map.get("id"));
                String validate3 = validate((String) map.get("step"));
                String validate4 = validate((String) map.get("status"));
                if (validate2 != null && validate != null && validate3 != null) {
                    String format = String.format("/exchanges/%s/%s/steps/%s/%s", this.integrationId, validate, validate3, validate2);
                    map.put("logts", str);
                    map.remove("exchange");
                    map.remove("id");
                    map.remove("step");
                    this.logsController.eventQueue.put(map2 -> {
                        map2.put(format, map);
                        trackState(str, map2);
                    });
                } else if (validate != null && validate4 != null) {
                    String format2 = String.format("/exchanges/%s/%s", this.integrationId, validate);
                    map.put("pod", this.podName);
                    map.put("ver", this.deploymentVersion);
                    if ("begin".equals(validate4)) {
                        map.put("logts", str);
                    }
                    map.remove("exchange");
                    map.remove("steps");
                    this.logsController.eventQueue.put(map3 -> {
                        for (Map.Entry entry : map.entrySet()) {
                            map3.put(format2 + "/" + ((String) entry.getKey()), entry.getValue());
                        }
                        trackState(str, map3);
                    });
                }
            } catch (InterruptedException e) {
                InterruptedIOException interruptedIOException = new InterruptedIOException(e.getMessage());
                interruptedIOException.initCause(e);
                throw interruptedIOException;
            } catch (JsonDBException | IOException | ClassCastException e2) {
            }
        }
    }

    private void trackState(String str, Map<String, Object> map) {
        this.state.time = str;
        map.put("/pods/" + this.podName, this.state);
        map.put("/integrations/" + this.integrationId, Boolean.TRUE);
    }

    private String validate(String str) {
        if (str == null) {
            return null;
        }
        return JsonRecordSupport.validateKey(str);
    }
}
