/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.test.integration.join;

import org.apache.samza.context.Context;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.InitableTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.task.WindowableTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Checker
implements StreamTask,
WindowableTask,
InitableTask {
    private static Logger logger = LoggerFactory.getLogger(Checker.class);
    private static final String CURRENT_EPOCH = "current-epoch";
    private KeyValueStore<String, String> store;
    private int expectedKeys;
    private int numPartitions;

    public void init(Context context) {
        this.store = context.getTaskContext().getStore("checker-state");
        this.expectedKeys = context.getJobContext().getConfig().getInt("expected.keys");
        this.numPartitions = context.getJobContext().getConfig().getInt("num.partitions");
    }

    public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
        String key = (String)envelope.getKey();
        String epoch = (String)envelope.getMessage();
        logger.info("Got key=" + key + ", epoch = " + epoch + " in checker...");
        this.checkEpoch(epoch);
        this.store.put((Object)key, (Object)epoch);
    }

    public void window(MessageCollector collector, TaskCoordinator coordinator) {
        String currentEpoch = (String)this.store.get((Object)CURRENT_EPOCH);
        logger.info("Checking if epoch " + currentEpoch + " is complete.");
        int count = 0;
        KeyValueIterator iter = this.store.all();
        while (iter.hasNext()) {
            Entry entry = (Entry)iter.next();
            String foundEpoch = (String)entry.getValue();
            if (foundEpoch.equals(currentEpoch)) {
                ++count;
                continue;
            }
            logger.info("####### Found a different epoch! - " + foundEpoch + " Current epoch is " + currentEpoch);
        }
        iter.close();
        if (count == this.expectedKeys + 1) {
            logger.info("Epoch " + currentEpoch + " is complete.");
            int nextEpoch = Integer.parseInt(currentEpoch) + 1;
            for (int i = 0; i < this.numPartitions; ++i) {
                logger.info("Emitting next epoch - " + Integer.toString(i) + " -> " + Integer.toString(nextEpoch));
                collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "epoch"), (Object)Integer.toString(i), (Object)Integer.toString(nextEpoch)));
            }
            this.store.put((Object)CURRENT_EPOCH, (Object)Integer.toString(nextEpoch));
        } else {
            if (count > this.expectedKeys + 1) {
                throw new IllegalStateException("Got " + count + " keys, which is more than the expected " + (this.expectedKeys + 1));
            }
            logger.info("Only found " + count + " valid keys, try again later.");
        }
    }

    private void checkEpoch(String epoch) {
        String curr = (String)this.store.get((Object)CURRENT_EPOCH);
        if (curr == null) {
            this.store.put((Object)CURRENT_EPOCH, (Object)epoch);
        } else {
            int currentEpochInStore = Integer.parseInt(curr);
            int currentEpochInMsg = Integer.parseInt(epoch);
            if (currentEpochInMsg <= currentEpochInStore) {
                if (currentEpochInMsg < currentEpochInStore) {
                    logger.info("#### Ignoring received epoch = " + epoch + " less than what is in store " + curr);
                }
            } else {
                throw new IllegalArgumentException("Got epoch " + epoch + " but have not yet completed " + curr);
            }
        }
    }
}

