/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.test.integration.join;

import java.util.HashSet;
import java.util.Set;
import org.apache.samza.container.TaskName;
import org.apache.samza.context.Context;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.InitableTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Joiner
implements StreamTask,
InitableTask {
    private static Logger logger = LoggerFactory.getLogger(Joiner.class);
    private KeyValueStore<String, String> store;
    private int expected;
    private TaskName taskName;

    public void init(Context context) {
        this.store = context.getTaskContext().getStore("joiner-state");
        this.expected = context.getJobContext().getConfig().getInt("num.partitions");
        this.taskName = context.getTaskContext().getTaskModel().getTaskName();
    }

    public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
        String key = (String)envelope.getKey();
        String value = (String)envelope.getMessage();
        String[] pieces = value.split("-");
        int epoch = Integer.parseInt(pieces[0]);
        int partition = Integer.parseInt(pieces[1].split(" ")[1]);
        Partitions partitions = this.loadPartitions(epoch, key);
        logger.info("Joiner got epoch = " + epoch + ", partition = " + partition + ", parts = " + partitions);
        if (partitions.epoch < epoch) {
            if (partitions.partitions.size() != this.expected) {
                throw new IllegalArgumentException("Should have " + this.expected + " partitions when new epoch starts.");
            }
            logger.info("Reseting epoch to " + epoch);
            this.store.delete((Object)key);
            partitions.epoch = epoch;
            partitions.partitions.clear();
            partitions.partitions.add(partition);
        } else if (partitions.epoch > epoch) {
            logger.info("Ignoring message for epoch " + epoch);
        } else {
            partitions.partitions.add(partition);
            if (partitions.partitions.size() == this.expected) {
                logger.info("Completed: " + key + " -> " + Integer.toString(epoch));
                collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "completed-keys"), (Object)key, (Object)Integer.toString(epoch)));
            }
        }
        this.store.put((Object)key, (Object)partitions.toString());
        logger.info("Join store in Task " + this.taskName + " " + key + " -> " + partitions.toString());
    }

    private Partitions loadPartitions(int epoch, String key) {
        String current = (String)this.store.get((Object)key);
        Partitions partitions = current == null ? new Partitions(epoch, new HashSet<Integer>()) : Partitions.parse(current);
        return partitions;
    }

    private static class Partitions {
        int epoch;
        Set<Integer> partitions;

        public Partitions(int epoch, Set<Integer> partitions) {
            this.epoch = epoch;
            this.partitions = partitions;
        }

        public static Partitions parse(String s) {
            String[] pieces = s.split("\\|", -1);
            int epoch = Integer.parseInt(pieces[1]);
            HashSet<Integer> set = new HashSet<Integer>(pieces.length);
            for (int i = 2; i < pieces.length - 1; ++i) {
                set.add(Integer.parseInt(pieces[i]));
            }
            return new Partitions(epoch, set);
        }

        public String toString() {
            StringBuilder b = new StringBuilder("|");
            b.append(this.epoch);
            b.append("|");
            for (int p : this.partitions) {
                b.append(p);
                b.append("|");
            }
            return b.toString();
        }
    }
}

