/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.test.integration.join;

import org.apache.samza.context.Context;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.task.InitableTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.task.WindowableTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Watcher
implements StreamTask,
WindowableTask,
InitableTask {
    private static Logger logger = LoggerFactory.getLogger(Watcher.class);
    private boolean inError = false;
    private long lastEpochChange = System.currentTimeMillis();
    private long maxTimeBetweenEpochsMs;
    private int currentEpoch = 0;

    public void init(Context context) {
        this.maxTimeBetweenEpochsMs = context.getJobContext().getConfig().getLong("max.time.between.epochs.ms");
    }

    public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
        int epoch = Integer.parseInt((String)envelope.getMessage());
        if (epoch > this.currentEpoch) {
            logger.info("Epoch changed to " + epoch + " from " + this.currentEpoch);
            this.currentEpoch = epoch;
            this.lastEpochChange = System.currentTimeMillis();
            this.inError = false;
        }
    }

    public void window(MessageCollector collector, TaskCoordinator coordinator) {
        boolean isLagging;
        boolean bl = isLagging = System.currentTimeMillis() - this.lastEpochChange > this.maxTimeBetweenEpochsMs;
        if (!this.inError && isLagging) {
            this.inError = true;
            logger.info("Error state detected, alerting...");
            logger.error("Job failed to make progress!" + String.format("No epoch change for %d minutes.", this.maxTimeBetweenEpochsMs / 60000L));
        }
    }
}

