/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.system.mock;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.BlockingEnvelopeMap;
import org.apache.samza.util.Clock;

public class MockSystemConsumer
extends BlockingEnvelopeMap {
    private final int messagesPerBatch;
    private final int threadCount;
    private final int brokerSleepMs;
    private final Set<SystemStreamPartition> ssps;
    private List<Thread> threads;

    public MockSystemConsumer(int messagesPerBatch, int threadCount, int brokerSleepMs) {
        super((MetricsRegistry)new MetricsRegistryMap("test-container-performance"), new Clock(){

            public long currentTimeMillis() {
                return System.currentTimeMillis();
            }
        });
        this.messagesPerBatch = messagesPerBatch;
        this.threadCount = threadCount;
        this.brokerSleepMs = brokerSleepMs;
        this.ssps = new HashSet<SystemStreamPartition>();
        this.threads = new ArrayList<Thread>(threadCount);
    }

    public void start() {
        for (int i = 0; i < this.threadCount; ++i) {
            HashSet<SystemStreamPartition> threadSsps = new HashSet<SystemStreamPartition>();
            for (SystemStreamPartition ssp : this.ssps) {
                if (Math.abs(ssp.hashCode()) % this.threadCount != i) continue;
                threadSsps.add(ssp);
            }
            Thread thread = new Thread((Runnable)new MockSystemConsumerRunnable(threadSsps), "MockSystemConsumer-" + i);
            thread.setDaemon(true);
            this.threads.add(thread);
            thread.start();
        }
    }

    public void stop() {
        for (Thread thread : this.threads) {
            thread.interrupt();
        }
        try {
            for (Thread thread : this.threads) {
                thread.join();
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    public void register(SystemStreamPartition systemStreamPartition, String offset) {
        super.register(systemStreamPartition, offset);
        this.ssps.add(systemStreamPartition);
        this.setIsAtHead(systemStreamPartition, true);
    }

    public class MockSystemConsumerRunnable
    implements Runnable {
        private final Set<SystemStreamPartition> ssps;

        public MockSystemConsumerRunnable(Set<SystemStreamPartition> ssps) {
            this.ssps = ssps;
        }

        @Override
        public void run() {
            try {
                while (!Thread.interrupted() && this.ssps.size() > 0) {
                    HashSet<SystemStreamPartition> sspsToFetch = new HashSet<SystemStreamPartition>();
                    for (SystemStreamPartition ssp : this.ssps) {
                        if (MockSystemConsumer.this.getNumMessagesInQueue(ssp) > 0) continue;
                        sspsToFetch.add(ssp);
                    }
                    Thread.sleep(MockSystemConsumer.this.brokerSleepMs);
                    for (SystemStreamPartition ssp : sspsToFetch) {
                        for (int i = 0; i < MockSystemConsumer.this.messagesPerBatch; ++i) {
                            MockSystemConsumer.this.put(ssp, new IncomingMessageEnvelope(ssp, "0", (Object)"key", (Object)"value"));
                        }
                    }
                }
            }
            catch (InterruptedException e) {
                System.out.println("Got interrupt. Shutting down.");
            }
        }
    }
}

