package edu.iu.dsc.tws.examples.streaming.wordcount.comms;

import edu.iu.dsc.tws.api.comms.Communicator;
import edu.iu.dsc.tws.api.comms.LogicalPlan;
import edu.iu.dsc.tws.api.comms.messaging.types.MessageTypes;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.exceptions.TimeoutException;
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
import edu.iu.dsc.tws.api.resource.IVolatileVolume;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.api.resource.Network;
import edu.iu.dsc.tws.comms.selectors.HashingSelector;
import edu.iu.dsc.tws.comms.stream.SPartition;
import edu.iu.dsc.tws.examples.utils.WordCountUtils;
import java.util.HashSet;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/examples/streaming/wordcount/comms/WordCountWorker.class */
public class WordCountWorker implements IWorker {
    private static final Logger LOG = Logger.getLogger(WordCountWorker.class.getName());
    private SPartition keyedPartition;
    private Communicator channel;
    private static final int NO_OF_TASKS = 8;
    private Config config;
    private int id;
    private int noOfTasksPerExecutor;
    private Set<Integer> sources;
    private Set<Integer> destinations;
    private LogicalPlan logicalPlan;

    public void execute(Config config, int i, IWorkerController iWorkerController, IPersistentVolume iPersistentVolume, IVolatileVolume iVolatileVolume) {
        this.config = config;
        this.id = i;
        this.noOfTasksPerExecutor = NO_OF_TASKS / iWorkerController.getNumberOfWorkers();
        setupTasks(iWorkerController);
        setupNetwork(iWorkerController);
        this.keyedPartition = new SPartition(this.channel, this.logicalPlan, this.sources, this.destinations, MessageTypes.OBJECT, new WordAggregate(), new HashingSelector());
        scheduleTasks();
        progress();
    }

    private void setupTasks(IWorkerController iWorkerController) {
        try {
            this.logicalPlan = WordCountUtils.createWordCountPlan(this.config, this.id, iWorkerController.getAllWorkers(), NO_OF_TASKS);
            this.sources = new HashSet();
            for (int i = 0; i < 4; i++) {
                this.sources.add(Integer.valueOf(i));
            }
            this.destinations = new HashSet();
            for (int i2 = 0; i2 < 4; i2++) {
                this.destinations.add(Integer.valueOf(4 + i2));
            }
            LOG.fine(String.format("%d sources %s destinations %s", Integer.valueOf(this.logicalPlan.getThisWorker()), this.sources, this.destinations));
        } catch (TimeoutException e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
        }
    }

    private void setupNetwork(IWorkerController iWorkerController) {
        this.channel = new Communicator(this.config, Network.initializeChannel(this.config, iWorkerController));
    }

    private void scheduleTasks() {
        if (this.id < 2) {
            for (int i = 0; i < this.noOfTasksPerExecutor; i++) {
                new Thread(new StreamingWordSource(this.keyedPartition, 1000, (this.noOfTasksPerExecutor * this.id) + i, 10)).start();
            }
        }
    }

    private void progress() {
        while (true) {
            try {
                this.channel.getChannel().progress();
                this.keyedPartition.progress();
            } catch (Throwable th) {
                LOG.log(Level.SEVERE, "Error", th);
            }
        }
    }
}
