/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.test.integration.join;

import org.apache.samza.container.TaskName;
import org.apache.samza.context.Context;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.InitableTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.task.WindowableTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Emitter
implements StreamTask,
InitableTask,
WindowableTask {
    private static Logger logger = LoggerFactory.getLogger(Emitter.class);
    private static final String EPOCH = "the-epoch";
    private static final String COUNT = "the-count";
    private KeyValueStore<String, String> state;
    private int max;
    private TaskName taskName;

    public void init(Context context) {
        this.state = context.getTaskContext().getStore("emitter-state");
        this.taskName = context.getTaskContext().getTaskModel().getTaskName();
        this.max = context.getJobContext().getConfig().getInt("count");
    }

    public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
        if (envelope.getSystemStreamPartition().getStream().equals("epoch")) {
            int newEpoch = Integer.parseInt((String)envelope.getMessage());
            logger.info("New epoch in message - " + newEpoch);
            Integer epoch = this.getInt(EPOCH);
            if (epoch == null || newEpoch == epoch) {
                return;
            }
            if (newEpoch < epoch) {
                throw new IllegalArgumentException("Got new epoch " + newEpoch + " which is less than current epoch " + epoch);
            }
            logger.info("Epoch: " + newEpoch);
            this.state.put((Object)EPOCH, (Object)Integer.toString(newEpoch));
            this.state.put((Object)COUNT, (Object)"0");
            coordinator.commit(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
        }
    }

    public void window(MessageCollector collector, TaskCoordinator coordinator) {
        Integer epoch = this.getInt(EPOCH);
        if (epoch == null) {
            this.resetEpoch();
            return;
        }
        int counter = this.getInt(COUNT);
        if (counter < this.max) {
            logger.info("Emitting: " + counter + ", epoch = " + epoch + ", task = " + this.taskName);
            OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new SystemStream("kafka", "emitted"), (Object)Integer.toString(counter), (Object)(epoch + "-" + this.taskName.toString()));
            collector.send(envelope);
            this.state.put((Object)COUNT, (Object)Integer.toString(this.getInt(COUNT) + 1));
        }
    }

    private void resetEpoch() {
        logger.info("Resetting epoch to 0");
        this.state.put((Object)EPOCH, (Object)"0");
        this.state.put((Object)COUNT, (Object)"0");
    }

    private Integer getInt(String key) {
        String value = (String)this.state.get((Object)key);
        return value == null ? null : Integer.valueOf(Integer.parseInt(value));
    }
}

