package com.uber.rss.tools;

import com.uber.rss.StreamServer;
import com.uber.rss.StreamServerConfig;
import com.uber.rss.clients.MultiServerAsyncWriteClient;
import com.uber.rss.clients.MultiServerSyncWriteClient;
import com.uber.rss.clients.MultiServerWriteClient;
import com.uber.rss.clients.PooledWriteClientFactory;
import com.uber.rss.clients.ServerReplicationGroupUtil;
import com.uber.rss.clients.ShuffleWriteConfig;
import com.uber.rss.common.AppMapId;
import com.uber.rss.common.AppShuffleId;
import com.uber.rss.common.AppTaskAttemptId;
import com.uber.rss.common.ServerDetail;
import com.uber.rss.common.ServerReplicationGroup;
import com.uber.rss.metadata.InMemoryServiceRegistry;
import com.uber.rss.metadata.ServiceRegistry;
import com.uber.rss.metrics.M3Stats;
import com.uber.rss.metrics.ScheduledMetricCollector;
import com.uber.rss.storage.ShuffleFileStorage;
import com.uber.rss.storage.ShuffleFileUtils;
import com.uber.rss.storage.ShuffleStorage;
import com.uber.rss.util.ExceptionUtils;
import com.uber.rss.util.RateCounter;
import java.lang.Thread;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/uber/rss/tools/StreamServerStressTool.class */
public class StreamServerStressTool {
    private static final int SHUFFLE_RECORD_EXTRA_BYTES = 16;
    private ScheduledExecutorService scheduler;
    private ScheduledMetricCollector scheduledMetricCollector;
    private ShuffleStorage storage;
    private AppShuffleId appShuffleId;
    public static ConcurrentHashMap<Integer, ConcurrentHashMap<String, AtomicLong>> debug = new ConcurrentHashMap<>();
    private static final Logger logger = LoggerFactory.getLogger(StreamServerStressTool.class);
    private List<StreamServer> servers = new ArrayList();
    private final List<String> serverIdsToShutdownDuringShuffleWrite = new ArrayList();
    private final List<String> serverIdsToShutdownDuringShuffleRead = new ArrayList();
    private List<String> serverHosts = new ArrayList();
    private List<Integer> serverPorts = new ArrayList();
    private List<String> serverRootDirs = new ArrayList();
    private List<ServerDetail> serverDetails = new ArrayList();
    private boolean useEpoll = false;
    private String workDir = "/tmp/rss";
    private int numServers = 4;
    private int numServerThreads = 5;
    private String appId = "app_" + System.nanoTime();
    private String appAttempt = "exec1";
    private int numMaps = 10;
    private int numMapAttempts = 1;
    private int startMapId = 0;
    private int endMapId = this.numMaps - 1;
    private int numPartitions = 7;
    private int partitionFanout = 1;
    int numSplits = 3;
    int numReplicas = 1;
    private int mapDelay = 1000;
    private int mapSlowness = 0;
    private long maxWait = 30000;
    private boolean useConnectionPool = false;
    private long numBytes = 104857600;
    private int writeClientQueueSize = 100;
    private int writeClientThreads = 4;
    private boolean deleteFiles = true;
    private int numTestValues = 1000;
    private int maxTestValueLen = ShuffleFileUtils.MAX_SPLITS;
    private ServiceRegistry serviceRegistry = new InMemoryServiceRegistry();
    private Random random = new Random();
    private AtomicLong totalShuffleWrittenBytes = new AtomicLong();
    private AtomicLong totalShuffleWrittenRecords = new AtomicLong();
    private AtomicLong successShuffleWrittenRecords = new AtomicLong();
    private AtomicLong totalSocketBytes = new AtomicLong();
    private List<Thread> allMapThreads = new ArrayList();
    private AtomicLong mapThreadErrors = new AtomicLong();
    private ConcurrentHashMap<Integer, Object> usedPartitionIds = new ConcurrentHashMap<>();

    public void setServerHosts(List<String> list) {
        this.serverHosts = list;
    }

    public void setServerPorts(List<Integer> list) {
        this.serverPorts = list;
    }

    public void setServerRootDirs(List<String> list) {
        this.serverRootDirs = list;
    }

    public void setWorkDir(String str) {
        this.workDir = str;
    }

    public void setNumServers(int i) {
        this.numServers = i;
    }

    public void setNumServerThreads(int i) {
        this.numServerThreads = i;
    }

    public void setAppId(String str) {
        this.appId = str;
    }

    public void setAppAttempt(String str) {
        this.appAttempt = str;
    }

    public void setNumMaps(int i) {
        this.numMaps = i;
        this.endMapId = i - 1;
    }

    public void setNumMapAttempts(int i) {
        this.numMapAttempts = i;
    }

    public void setStartMapId(int i) {
        this.startMapId = i;
    }

    public void setEndMapId(int i) {
        this.endMapId = i;
    }

    public void setNumPartitions(int i) {
        this.numPartitions = i;
    }

    public void setNumSplits(int i) {
        this.numSplits = i;
    }

    public void setNumReplicas(int i) {
        this.numReplicas = i;
    }

    public void setPartitionFanout(int i) {
        this.partitionFanout = i;
    }

    public void setMapDelay(int i) {
        this.mapDelay = i;
    }

    public void setMapSlowness(int i) {
        this.mapSlowness = i;
    }

    public void setMaxWait(long j) {
        this.maxWait = j;
    }

    public void setUseConnectionPool(boolean z) {
        this.useConnectionPool = z;
    }

    public void setNumBytes(long j) {
        this.numBytes = j;
    }

    public void setWriteClientQueueSize(int i) {
        this.writeClientQueueSize = i;
    }

    public void setWriteClientThreads(int i) {
        this.writeClientThreads = i;
    }

    public void setDeleteFiles(boolean z) {
        this.deleteFiles = z;
    }

    public void setNumTestValues(int i) {
        this.numTestValues = i;
    }

    public void setMaxTestValueLen(int i) {
        this.maxTestValueLen = i;
    }

    public void setStorage(ShuffleStorage shuffleStorage) {
        this.storage = shuffleStorage;
    }

    public void setServiceRegistry(ServiceRegistry serviceRegistry) {
        this.serviceRegistry = serviceRegistry;
    }

    public void run() {
        this.scheduler = Executors.newScheduledThreadPool(1);
        this.scheduledMetricCollector = new ScheduledMetricCollector(this.serviceRegistry);
        this.scheduledMetricCollector.scheduleCollectingMetrics(this.scheduler, "default", "default");
        this.appShuffleId = new AppShuffleId(this.appId, this.appAttempt, 1);
        this.storage = new ShuffleFileStorage();
        if (this.serverHosts.isEmpty()) {
            for (int i = 0; i < this.numServers; i++) {
                startNewServer();
            }
            if (this.numReplicas > 1) {
                List<ServerReplicationGroup> createReplicationGroups = ServerReplicationGroupUtil.createReplicationGroups(this.serverDetails, this.numReplicas);
                createReplicationGroups.forEach(serverReplicationGroup -> {
                    logger.info(String.format(String.format("Server replication group: %s", serverReplicationGroup), new Object[0]));
                });
                int ceil = (int) Math.ceil(createReplicationGroups.size() / 2.0d);
                List list = (List) createReplicationGroups.stream().limit(ceil).collect(Collectors.toList());
                List list2 = (List) createReplicationGroups.stream().skip(ceil).collect(Collectors.toList());
                this.serverIdsToShutdownDuringShuffleWrite.addAll((Collection) list.stream().map(serverReplicationGroup2 -> {
                    return serverReplicationGroup2.getServers().get(0).getServerId();
                }).collect(Collectors.toList()));
                this.serverIdsToShutdownDuringShuffleRead.addAll((Collection) list2.stream().map(serverReplicationGroup3 -> {
                    return serverReplicationGroup3.getServers().get(0).getServerId();
                }).collect(Collectors.toList()));
                logger.info(String.format(String.format("Servers to shutdown during shuffle write: %s, servers to shutdown during shuffle read: %s", this.serverIdsToShutdownDuringShuffleWrite, this.serverIdsToShutdownDuringShuffleRead), new Object[0]));
            }
        }
        logger.info(String.format("Server root dirs: %s", StringUtils.join(this.serverRootDirs, ':')));
        ArrayList arrayList = new ArrayList();
        arrayList.add(null);
        arrayList.add(new byte[0]);
        arrayList.add("".getBytes(StandardCharsets.UTF_8));
        while (arrayList.size() < this.numTestValues) {
            arrayList.add(StringUtils.repeat((char) (97 + this.random.nextInt(26)), this.random.nextInt(this.maxTestValueLen)).getBytes(StandardCharsets.UTF_8));
        }
        HashSet hashSet = new HashSet();
        if ((this.endMapId + 1) - this.startMapId > 2) {
            hashSet.add(Integer.valueOf(this.startMapId + this.random.nextInt((this.endMapId + 1) - this.startMapId)));
            hashSet.add(Integer.valueOf(this.startMapId + this.random.nextInt((this.endMapId + 1) - this.startMapId)));
        }
        RateCounter[] rateCounterArr = new RateCounter[(this.endMapId + 1) - this.startMapId];
        AtomicLong atomicLong = new AtomicLong();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (int i2 = this.startMapId; i2 <= this.endMapId; i2++) {
            arrayList2.add(Integer.valueOf(this.random.nextInt(this.numMapAttempts) + 1));
        }
        for (int i3 = this.startMapId; i3 <= this.endMapId; i3++) {
            int i4 = i3;
            int i5 = i3 - this.startMapId;
            rateCounterArr[i5] = new RateCounter(5000L);
            AppMapId appMapId = new AppMapId(this.appShuffleId.getAppId(), this.appShuffleId.getAppAttempt(), this.appShuffleId.getShuffleId(), i4);
            int intValue = ((Integer) arrayList2.get(i5)).intValue();
            Thread thread = new Thread(() -> {
                int i6 = 1;
                while (i6 <= intValue) {
                    long andIncrement = atomicLong.getAndIncrement();
                    boolean z = i6 == intValue;
                    boolean contains = hashSet.contains(Integer.valueOf(i4));
                    if (z) {
                        synchronized (arrayList3) {
                            arrayList3.add(Long.valueOf(andIncrement));
                        }
                    }
                    simulateMapperTask(arrayList, appMapId, andIncrement, z, contains, rateCounterArr[i5], concurrentHashMap);
                    i6++;
                }
            });
            thread.setName(String.format("[Map Thread %s]", appMapId));
            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: com.uber.rss.tools.StreamServerStressTool.1
                @Override // java.lang.Thread.UncaughtExceptionHandler
                public void uncaughtException(Thread thread2, Throwable th) {
                    StreamServerStressTool.logger.error(String.format("Mapper thread %s got exception", thread2.getName()), th);
                    StreamServerStressTool.this.mapThreadErrors.incrementAndGet();
                }
            });
            this.allMapThreads.add(thread);
        }
        long currentTimeMillis = System.currentTimeMillis();
        this.allMapThreads.forEach(thread2 -> {
            thread2.start();
        });
        this.allMapThreads.forEach(thread3 -> {
            try {
                thread3.join();
            } catch (InterruptedException e) {
                M3Stats.addException(e, getClass().getSimpleName());
                throw new RuntimeException(e);
            }
        });
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        logger.info(String.format("Total written bytes: %s, records: %s, throughput: %s mb/s, total socket bytes: %s", this.totalShuffleWrittenBytes, this.totalShuffleWrittenRecords, Double.valueOf(currentTimeMillis2 == 0 ? 0.0d : (this.totalShuffleWrittenBytes.get() / currentTimeMillis2) * 9.5367431640625E-4d), this.totalSocketBytes));
        if (this.mapThreadErrors.get() > 0) {
            throw new RuntimeException("Number of errors in map threads: " + this.mapThreadErrors);
        }
        try {
            if (!this.servers.isEmpty()) {
                try {
                    int i6 = this.numReplicas;
                    HashMap hashMap = new HashMap();
                    concurrentHashMap.entrySet().stream().forEach(entry -> {
                        hashMap.put((Integer) entry.getKey(), Long.valueOf(((AtomicLong) entry.getValue()).get()));
                    });
                    StreamReadClientVerify streamReadClientVerify = new StreamReadClientVerify();
                    streamReadClientVerify.setRssServers(this.serverDetails, i6);
                    streamReadClientVerify.setAppShuffleId(this.appShuffleId);
                    streamReadClientVerify.setNumPartitions(this.numPartitions);
                    streamReadClientVerify.setPartitionFanout(this.partitionFanout);
                    streamReadClientVerify.setExpectedTotalRecords(this.successShuffleWrittenRecords.get());
                    streamReadClientVerify.setExpectedTotalRecordsForEachPartition(hashMap);
                    streamReadClientVerify.setActionToSimulateBadServer(() -> {
                        synchronized (this.servers) {
                            synchronized (this.serverIdsToShutdownDuringShuffleRead) {
                                for (String str : this.serverIdsToShutdownDuringShuffleRead) {
                                    StreamServer streamServer = this.servers.stream().filter(streamServer2 -> {
                                        return streamServer2 != null;
                                    }).filter(streamServer3 -> {
                                        return streamServer3.getServerId().equals(str);
                                    }).findFirst().get();
                                    logger.info(String.format("Simulate bad server during shuffle read by shutting down server: %s", streamServer));
                                    shutdownServer(streamServer);
                                    this.servers.set(this.servers.indexOf(streamServer), null);
                                }
                                this.serverIdsToShutdownDuringShuffleRead.clear();
                            }
                        }
                    });
                    logger.info(String.format("Verifying reading from servers: %s", StringUtils.join(this.serverDetails, ", ")));
                    streamReadClientVerify.verifyRecords(this.usedPartitionIds.keySet(), arrayList3);
                    logger.info(String.format("Verifying reading from servers: %s", StringUtils.join(this.serverDetails, ", ")));
                    PooledWriteClientFactory.getInstance().shutdown();
                    synchronized (this.servers) {
                        this.servers.forEach(streamServer -> {
                            if (streamServer != null) {
                                shutdownServer(streamServer);
                            }
                        });
                    }
                    if (this.deleteFiles) {
                        try {
                            logger.info(String.format("Deleting files: %s", StringUtils.join(this.serverRootDirs, ", ")));
                            deleteDirectories(this.serverRootDirs);
                            logger.info(String.format("Deleted files: %s", StringUtils.join(this.serverRootDirs, ", ")));
                        } catch (Throwable th) {
                            M3Stats.addException(th, getClass().getSimpleName());
                            logger.info("Got some error when deleting files: %s, ignored them");
                        }
                    }
                } catch (Throwable th2) {
                    M3Stats.addException(th2, getClass().getSimpleName());
                    logger.error(String.format("Failed to verify reading from servers: %s", StringUtils.join(this.serverDetails, ", ")), th2);
                    throw th2;
                }
            }
            if (this.mapThreadErrors.get() > 0) {
                throw new RuntimeException("Number of errors in map threads: " + this.mapThreadErrors);
            }
        } catch (Throwable th3) {
            PooledWriteClientFactory.getInstance().shutdown();
            synchronized (this.servers) {
                this.servers.forEach(streamServer2 -> {
                    if (streamServer2 != null) {
                        shutdownServer(streamServer2);
                    }
                });
                if (this.deleteFiles) {
                    try {
                        logger.info(String.format("Deleting files: %s", StringUtils.join(this.serverRootDirs, ", ")));
                        deleteDirectories(this.serverRootDirs);
                        logger.info(String.format("Deleted files: %s", StringUtils.join(this.serverRootDirs, ", ")));
                    } catch (Throwable th4) {
                        M3Stats.addException(th4, getClass().getSimpleName());
                        logger.info("Got some error when deleting files: %s, ignored them");
                    }
                }
                throw th3;
            }
        }
    }

    public void cleanup() {
        this.scheduledMetricCollector.collectMetrics("default", "default");
        if (this.scheduler != null) {
            this.scheduler.shutdown();
            this.scheduler = null;
        }
    }

    public String toString() {
        return "StreamServerStressTool{serverHosts=" + this.serverHosts + ", useEpoll=" + this.useEpoll + ", serverPorts=" + this.serverPorts + ", serverRootDirs=" + this.serverRootDirs + ", workDir='" + this.workDir + "', numServers=" + this.numServers + ", numServerThreads=" + this.numServerThreads + ", appId='" + this.appId + "', appAttempt='" + this.appAttempt + "', numBytes=" + this.numBytes + ", numMaps=" + this.numMaps + ", numMapAttempts=" + this.numMapAttempts + ", startMapId=" + this.startMapId + ", endMapId=" + this.endMapId + ", numPartitions=" + this.numPartitions + ", numSplits=" + this.numSplits + ", numReplicas=" + this.numReplicas + ", partitionFanout=" + this.partitionFanout + ", mapDelay=" + this.mapDelay + ", mapSlowness=" + this.mapSlowness + ", maxWait=" + this.maxWait + ", writeClientQueueSize=" + this.writeClientQueueSize + ", writeClientThreads=" + this.writeClientThreads + ", deleteFiles=" + this.deleteFiles + ", numTestValues=" + this.numTestValues + ", maxTestValueLen=" + this.maxTestValueLen + ", storage=" + this.storage + ", serviceRegistry=" + this.serviceRegistry + ", useConnectionPool=" + this.useConnectionPool + '}';
    }

    private void simulateMapperTask(List<byte[]> list, AppMapId appMapId, long j, boolean z, boolean z2, RateCounter rateCounter, ConcurrentHashMap<Integer, AtomicLong> concurrentHashMap) {
        MultiServerWriteClient multiServerAsyncWriteClient;
        if (this.mapDelay > 0) {
            int nextInt = this.random.nextInt(this.mapDelay);
            logger.info(String.format("Delaying map %s: %s", appMapId, Integer.valueOf(nextInt)));
            try {
                Thread.sleep(nextInt);
            } catch (InterruptedException e) {
                M3Stats.addException(e, getClass().getSimpleName());
                throw new RuntimeException(e);
            }
        }
        ShuffleWriteConfig shuffleWriteConfig = new ShuffleWriteConfig((short) this.numSplits);
        long j2 = 120000 * 3;
        List<ServerReplicationGroup> createReplicationGroups = ServerReplicationGroupUtil.createReplicationGroups(this.serverDetails, this.numReplicas);
        if (this.writeClientQueueSize == 0) {
            multiServerAsyncWriteClient = new MultiServerSyncWriteClient(createReplicationGroups, this.partitionFanout, 120000, j2, true, this.useConnectionPool, "user1", this.appId, this.appAttempt, shuffleWriteConfig);
            multiServerAsyncWriteClient.connect();
            multiServerAsyncWriteClient.startUpload(new AppTaskAttemptId(appMapId, j), this.numMaps, this.numPartitions);
        } else {
            multiServerAsyncWriteClient = new MultiServerAsyncWriteClient(createReplicationGroups, this.partitionFanout, 120000, j2, true, this.useConnectionPool, this.writeClientQueueSize, this.writeClientThreads, "user1", this.appId, this.appAttempt, shuffleWriteConfig);
            multiServerAsyncWriteClient.connect();
            multiServerAsyncWriteClient.startUpload(new AppTaskAttemptId(appMapId, j), this.numMaps, this.numPartitions);
        }
        logger.info(String.format("Map %s attempt %s started, write client: %s", appMapId, Long.valueOf(j), multiServerAsyncWriteClient));
        if (!z2) {
            int nextInt2 = this.random.nextInt(this.numPartitions);
            multiServerAsyncWriteClient.writeDataBlock(nextInt2, null);
            this.totalShuffleWrittenBytes.addAndGet(16L);
            this.totalShuffleWrittenRecords.incrementAndGet();
            if (z) {
                this.successShuffleWrittenRecords.incrementAndGet();
                this.usedPartitionIds.putIfAbsent(Integer.valueOf(nextInt2), Integer.valueOf(nextInt2));
                concurrentHashMap.computeIfAbsent(Integer.valueOf(nextInt2), num -> {
                    return new AtomicLong();
                }).incrementAndGet();
            }
            multiServerAsyncWriteClient.writeDataBlock(nextInt2, ByteBuffer.wrap(new byte[0]));
            this.totalShuffleWrittenBytes.addAndGet(16L);
            this.totalShuffleWrittenRecords.incrementAndGet();
            if (z) {
                this.successShuffleWrittenRecords.incrementAndGet();
                this.usedPartitionIds.putIfAbsent(Integer.valueOf(nextInt2), Integer.valueOf(nextInt2));
                concurrentHashMap.computeIfAbsent(Integer.valueOf(nextInt2), num2 -> {
                    return new AtomicLong();
                }).incrementAndGet();
            }
            while (this.totalShuffleWrittenBytes.get() < this.numBytes) {
                long j3 = this.totalShuffleWrittenBytes.get();
                int nextInt3 = this.random.nextInt(this.numPartitions);
                if (list.get(this.random.nextInt(list.size())) != null) {
                    this.totalShuffleWrittenBytes.addAndGet(r0.length);
                }
                byte[] bArr = list.get(this.random.nextInt(list.size()));
                if (bArr != null) {
                    this.totalShuffleWrittenBytes.addAndGet(bArr.length);
                }
                this.totalShuffleWrittenBytes.addAndGet(16L);
                this.totalShuffleWrittenRecords.incrementAndGet();
                if (z) {
                    this.successShuffleWrittenRecords.incrementAndGet();
                    this.usedPartitionIds.putIfAbsent(Integer.valueOf(nextInt3), Integer.valueOf(nextInt3));
                    concurrentHashMap.computeIfAbsent(Integer.valueOf(nextInt3), num3 -> {
                        return new AtomicLong();
                    }).incrementAndGet();
                }
                multiServerAsyncWriteClient.writeDataBlock(nextInt3, bArr == null ? null : ByteBuffer.wrap(bArr));
                if (this.mapSlowness > 0) {
                    try {
                        Thread.sleep(this.mapSlowness);
                    } catch (InterruptedException e2) {
                        M3Stats.addException(e2, getClass().getSimpleName());
                        throw new RuntimeException(e2);
                    }
                }
                long j4 = this.numBytes / 2;
                if (j3 < j4 && this.totalShuffleWrittenBytes.get() >= j4) {
                    synchronized (this.servers) {
                        synchronized (this.serverIdsToShutdownDuringShuffleWrite) {
                            for (String str : this.serverIdsToShutdownDuringShuffleWrite) {
                                StreamServer streamServer = this.servers.stream().filter(streamServer2 -> {
                                    return streamServer2 != null;
                                }).filter(streamServer3 -> {
                                    return streamServer3.getServerId().equals(str);
                                }).findFirst().get();
                                logger.info(String.format("Simulate bad server during shuffle write by shutting down server: %s", streamServer));
                                shutdownServer(streamServer);
                                this.servers.set(this.servers.indexOf(streamServer), null);
                            }
                            this.serverIdsToShutdownDuringShuffleWrite.clear();
                        }
                    }
                }
            }
        }
        try {
            multiServerAsyncWriteClient.finishUpload();
            Double addValueAndGetRate = rateCounter.addValueAndGetRate(multiServerAsyncWriteClient.getShuffleWriteBytes());
            if (addValueAndGetRate != null) {
                logger.info(String.format("Map %s uploaded bytes: %s, rate: %s mb/s", appMapId, Long.valueOf(rateCounter.getOverallValue()), Double.valueOf(addValueAndGetRate.doubleValue() * 9.5367431640625E-4d)));
            }
            try {
                logger.info(String.format("Closing write client: %s", multiServerAsyncWriteClient));
                multiServerAsyncWriteClient.close();
            } catch (Exception e3) {
                M3Stats.addException(e3, getClass().getSimpleName());
                throw new RuntimeException(e3);
            }
        } catch (Throwable th) {
            multiServerAsyncWriteClient.close();
            M3Stats.addException(th, getClass().getSimpleName());
            if (z) {
                throw th;
            }
            logger.debug(String.format("Got ignorable error from stale map task: %s", ExceptionUtils.getSimpleMessage(th)));
        }
        logger.info(String.format("Map %s attempt %s finished", appMapId, Long.valueOf(j)));
        logger.info(String.format("Map %s total uploaded bytes: %s mb, rate: %s mb/s", appMapId, Double.valueOf(rateCounter.getOverallValue() / 1048576.0d), Double.valueOf(rateCounter.getOverallRate() * 9.5367431640625E-4d)));
    }

    private void startNewServer() {
        try {
            String path = Paths.get(this.workDir, String.format("server_%s", Long.valueOf(System.nanoTime()))).toString();
            while (this.storage.exists(path)) {
                path = Paths.get(this.workDir, String.format("server_%s", Long.valueOf(System.nanoTime()))).toString();
            }
            StreamServerConfig streamServerConfig = new StreamServerConfig();
            streamServerConfig.setUseEpoll(this.useEpoll);
            streamServerConfig.setNettyAcceptThreads(this.numServerThreads);
            streamServerConfig.setNettyWorkerThreads(this.numServerThreads);
            streamServerConfig.setStorage(this.storage);
            streamServerConfig.setShufflePort(0);
            streamServerConfig.setHttpPort(0);
            streamServerConfig.setRootDirectory(path);
            streamServerConfig.setDataCenter("default");
            streamServerConfig.setCluster("default");
            streamServerConfig.setAppMemoryRetentionMillis(TimeUnit.HOURS.toMillis(1L));
            StreamServer streamServer = new StreamServer(streamServerConfig, this.serviceRegistry);
            streamServer.run();
            this.servers.add(streamServer);
            this.serverHosts.add("localhost");
            this.serverPorts.add(Integer.valueOf(streamServer.getShufflePort()));
            this.serverRootDirs.add(path);
            this.serverDetails.add(new ServerDetail(streamServer.getServerId(), String.format("localhost:%s", Integer.valueOf(streamServer.getShufflePort()))));
            logger.info(String.format("Started server, port: %s, rootDir: %s, %s", Integer.valueOf(streamServer.getShufflePort()), path, streamServerConfig));
        } catch (Throwable th) {
            M3Stats.addException(th, getClass().getSimpleName());
            throw new RuntimeException("Failed to start stream server", th);
        }
    }

    private void shutdownServer(StreamServer streamServer) {
        logger.info(String.format("Shutting down server: %s", streamServer));
        streamServer.shutdown(true);
    }

    private void deleteDirectories(List<String> list) {
        list.stream().forEach(str -> {
            logger.info("Deleting directory: " + str);
            if (!this.storage.exists(str)) {
                logger.info("Directory not exist: " + str);
            } else {
                this.storage.deleteDirectory(str);
                logger.info("Deleted directory: " + str);
            }
        });
    }

    public static void main(String[] strArr) {
        StreamServerStressTool streamServerStressTool = new StreamServerStressTool();
        int i = 0;
        while (i < strArr.length) {
            int i2 = i;
            int i3 = i + 1;
            String str = strArr[i2];
            if (str.equalsIgnoreCase("-workDir")) {
                i = i3 + 1;
                streamServerStressTool.workDir = strArr[i3];
            } else if (str.equalsIgnoreCase("-servers")) {
                i = i3 + 1;
                for (String str2 : strArr[i3].split(",")) {
                    String[] split = str2.split(":");
                    streamServerStressTool.serverHosts.add(split[0]);
                    streamServerStressTool.serverPorts.add(Integer.valueOf(Integer.parseInt(split[1])));
                }
            } else if (str.equalsIgnoreCase("-epoll")) {
                i = i3 + 1;
                streamServerStressTool.useEpoll = Boolean.parseBoolean(strArr[i3]);
            } else if (str.equalsIgnoreCase("-numServers")) {
                i = i3 + 1;
                streamServerStressTool.numServers = Integer.parseInt(strArr[i3]);
            } else if (str.equalsIgnoreCase("-numServerThreads")) {
                i = i3 + 1;
                streamServerStressTool.numServerThreads = Integer.parseInt(strArr[i3]);
            } else if (str.equalsIgnoreCase("-writeClientQueueSize")) {
                i = i3 + 1;
                streamServerStressTool.writeClientQueueSize = Integer.parseInt(strArr[i3]);
            } else if (str.equalsIgnoreCase("-writeClientThreads")) {
                i = i3 + 1;
                streamServerStressTool.writeClientThreads = Integer.parseInt(strArr[i3]);
            } else if (str.equalsIgnoreCase("-appId")) {
                i = i3 + 1;
                streamServerStressTool.appId = strArr[i3];
            } else if (str.equalsIgnoreCase("-numBytes")) {
                i = i3 + 1;
                streamServerStressTool.numBytes = com.uber.rss.util.StringUtils.getBytesValue(strArr[i3]).longValue();
            } else if (str.equalsIgnoreCase("-numMaps")) {
                i = i3 + 1;
                streamServerStressTool.numMaps = Integer.parseInt(strArr[i3]);
                streamServerStressTool.startMapId = 0;
                streamServerStressTool.endMapId = streamServerStressTool.numMaps - 1;
            } else if (str.equalsIgnoreCase("-numMapAttempts")) {
                i = i3 + 1;
                streamServerStressTool.numMapAttempts = Integer.parseInt(strArr[i3]);
            } else if (str.equalsIgnoreCase("-startMapId")) {
                i = i3 + 1;
                streamServerStressTool.startMapId = Integer.parseInt(strArr[i3]);
            } else if (str.equalsIgnoreCase("-endMapId")) {
                i = i3 + 1;
                streamServerStressTool.endMapId = Integer.parseInt(strArr[i3]);
            } else if (str.equalsIgnoreCase("-mapDelay")) {
                i = i3 + 1;
                streamServerStressTool.mapDelay = Integer.parseInt(strArr[i3]);
            } else if (str.equalsIgnoreCase("-mapSlowness")) {
                i = i3 + 1;
                streamServerStressTool.mapSlowness = Integer.parseInt(strArr[i3]);
            } else if (str.equalsIgnoreCase("-maxWait")) {
                i = i3 + 1;
                streamServerStressTool.maxWait = Long.parseLong(strArr[i3]);
            } else if (str.equalsIgnoreCase("-numPartitions")) {
                i = i3 + 1;
                streamServerStressTool.numPartitions = Integer.parseInt(strArr[i3]);
            } else if (str.equalsIgnoreCase("-numSplits")) {
                i = i3 + 1;
                streamServerStressTool.numSplits = Integer.parseInt(strArr[i3]);
            } else if (str.equalsIgnoreCase("-numReplicas")) {
                i = i3 + 1;
                streamServerStressTool.numReplicas = Integer.parseInt(strArr[i3]);
            } else if (str.equalsIgnoreCase("-partitionFanout")) {
                i = i3 + 1;
                streamServerStressTool.partitionFanout = Integer.parseInt(strArr[i3]);
            } else {
                if (!str.equalsIgnoreCase("-deleteFiles")) {
                    throw new IllegalArgumentException("Unsupported argument: " + str);
                }
                i = i3 + 1;
                streamServerStressTool.deleteFiles = Boolean.parseBoolean(strArr[i3]);
            }
        }
        try {
            streamServerStressTool.run();
            streamServerStressTool.cleanup();
            M3Stats.closeDefaultScope();
            logger.info(String.format("%s finished", StreamServerStressToolLongRun.class.getSimpleName()));
        } catch (Throwable th) {
            streamServerStressTool.cleanup();
            throw th;
        }
    }
}
