package net.openhft.chronicle.queue.impl.single;

import java.io.File;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import net.openhft.chronicle.queue.DirectoryUtils;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.ValueOut;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/openhft/chronicle/queue/impl/single/RollCycleMultiThreadStressTest.class */
public class RollCycleMultiThreadStressTest {
    private static final Logger LOG = LoggerFactory.getLogger(RollCycleMultiThreadStressTest.class);
    private static final long SLEEP_PER_WRITE_NANOS = Long.getLong("writeLatency", 10000).longValue();
    private static final int TEST_TIME = Integer.getInteger("testTime", 90).intValue();
    static final int NUMBER_OF_INTS = 8;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/openhft/chronicle/queue/impl/single/RollCycleMultiThreadStressTest$Reader.class */
    public static final class Reader implements Callable<Throwable> {
        private final File path;
        private final int expectedNumberOfMessages;
        private volatile int lastRead = -1;

        Reader(File file, int i) {
            this.path = file;
            this.expectedNumberOfMessages = i;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        /* JADX WARN: Finally extract failed */
        @Override // java.util.concurrent.Callable
        public Throwable call() {
            try {
                SingleChronicleQueue build = SingleChronicleQueueBuilder.binary(this.path).testBlockSize().rollCycle(RollCycles.TEST_SECONDLY).build();
                Throwable th = null;
                try {
                    ExcerptTailer createTailer = build.createTailer();
                    while (this.lastRead != this.expectedNumberOfMessages - 1) {
                        DocumentContext readingDocument = createTailer.readingDocument();
                        Throwable th2 = null;
                        try {
                            try {
                                if (readingDocument.isPresent()) {
                                    int i = -1;
                                    for (int i2 = 0; i2 < RollCycleMultiThreadStressTest.NUMBER_OF_INTS; i2++) {
                                        i = readingDocument.wire().getValueIn().int32();
                                        Assert.assertEquals(this.lastRead + 1, i);
                                    }
                                    this.lastRead = i;
                                }
                                if (readingDocument != null) {
                                    if (0 != 0) {
                                        try {
                                            readingDocument.close();
                                        } catch (Throwable th3) {
                                            th2.addSuppressed(th3);
                                        }
                                    } else {
                                        readingDocument.close();
                                    }
                                }
                            } finally {
                            }
                        } catch (Throwable th4) {
                            if (readingDocument != null) {
                                if (th2 != null) {
                                    try {
                                        readingDocument.close();
                                    } catch (Throwable th5) {
                                        th2.addSuppressed(th5);
                                    }
                                } else {
                                    readingDocument.close();
                                }
                            }
                            throw th4;
                        }
                    }
                    if (build != null) {
                        if (0 != 0) {
                            try {
                                build.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        } else {
                            build.close();
                        }
                    }
                    return null;
                } catch (Throwable th7) {
                    if (build != null) {
                        if (0 != 0) {
                            try {
                                build.close();
                            } catch (Throwable th8) {
                                th.addSuppressed(th8);
                            }
                        } else {
                            build.close();
                        }
                    }
                    throw th7;
                }
            } catch (Throwable th9) {
                th9.printStackTrace();
                return th9;
            }
        }
    }

    /* loaded from: input_file:net/openhft/chronicle/queue/impl/single/RollCycleMultiThreadStressTest$Writer.class */
    private static final class Writer implements Callable<Throwable> {
        private final File path;
        private final AtomicInteger wrote;
        private final int expectedNumberOfMessages;

        private Writer(File file, AtomicInteger atomicInteger, int i) {
            this.path = file;
            this.wrote = atomicInteger;
            this.expectedNumberOfMessages = i;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Throwable call() {
            int andIncrement;
            try {
                SingleChronicleQueue build = SingleChronicleQueueBuilder.binary(this.path).testBlockSize().rollCycle(RollCycles.TEST_SECONDLY).build();
                Throwable th = null;
                try {
                    ExcerptAppender acquireAppender = build.acquireAppender();
                    long nanoTime = System.nanoTime();
                    do {
                        DocumentContext writingDocument = acquireAppender.writingDocument();
                        Throwable th2 = null;
                        try {
                            try {
                                andIncrement = this.wrote.getAndIncrement();
                                ValueOut valueOut = writingDocument.wire().getValueOut();
                                for (int i = 0; i < RollCycleMultiThreadStressTest.NUMBER_OF_INTS; i++) {
                                    valueOut.int32(andIncrement);
                                }
                                long nanoTime2 = nanoTime - System.nanoTime();
                                if (nanoTime2 > 0) {
                                    LockSupport.parkNanos(nanoTime2);
                                }
                                nanoTime = (long) (nanoTime + (RollCycleMultiThreadStressTest.SLEEP_PER_WRITE_NANOS * 0.99d));
                                if (writingDocument != null) {
                                    if (0 != 0) {
                                        try {
                                            writingDocument.close();
                                        } catch (Throwable th3) {
                                            th2.addSuppressed(th3);
                                        }
                                    } else {
                                        writingDocument.close();
                                    }
                                }
                            } finally {
                            }
                        } finally {
                        }
                    } while (andIncrement < this.expectedNumberOfMessages);
                    return null;
                } finally {
                    if (build != null) {
                        if (0 != 0) {
                            try {
                                build.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            build.close();
                        }
                    }
                }
            } catch (Exception e) {
                return e;
            }
        }
    }

    @Test
    @Ignore("long running")
    public void stress() throws Exception {
        File tempDir = DirectoryUtils.tempDir("rollCycleStress");
        LOG.warn("using path {} now is {}", tempDir, LocalDateTime.now());
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        int i = (availableProcessors / 4) + 1;
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(availableProcessors);
        AtomicInteger atomicInteger = new AtomicInteger();
        int i2 = (int) ((TEST_TIME * 1.0E9d) / SLEEP_PER_WRITE_NANOS);
        System.out.printf("Running test with %d writers and %d readers, sleep %dns%n", Integer.valueOf(i), Integer.valueOf(availableProcessors - i), Long.valueOf(SLEEP_PER_WRITE_NANOS));
        System.out.printf("Writing %d messages with %dns interval%n", Integer.valueOf(i2), Long.valueOf(SLEEP_PER_WRITE_NANOS));
        System.out.printf("Should take ~%dsec%n", Long.valueOf(TimeUnit.NANOSECONDS.toSeconds(i2 * SLEEP_PER_WRITE_NANOS) / i));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i3 = 0; i3 < i; i3++) {
            arrayList.add(newFixedThreadPool.submit(new Writer(tempDir, atomicInteger, i2)));
        }
        for (int i4 = 0; i4 < availableProcessors - i; i4++) {
            Reader reader = new Reader(tempDir, i2);
            arrayList2.add(reader);
            arrayList.add(newFixedThreadPool.submit(reader));
        }
        long currentTimeMillis = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(TEST_TIME);
        int i5 = 0;
        while (System.currentTimeMillis() < currentTimeMillis && atomicInteger.get() < i2) {
            System.out.printf("Writer has written %d of %d messages after %ds. Readers at %s. Waiting...%n", Integer.valueOf(atomicInteger.get() + 1), Integer.valueOf(i2), Integer.valueOf(i5 * 10), (String) arrayList2.stream().map(reader2 -> {
                return Integer.toString(reader2.lastRead);
            }).collect(Collectors.joining(",")));
            arrayList2.forEach(reader3 -> {
                if (atomicInteger.get() - reader3.lastRead > 1000000) {
                    throw new AssertionError("Reader is stuck");
                }
            });
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10L));
            i5++;
        }
        Assert.assertTrue("Did not write " + i2 + " within timeout", atomicInteger.get() >= i2);
        long currentTimeMillis2 = System.currentTimeMillis() + 60000;
        while (System.currentTimeMillis() < currentTimeMillis2 && !areAllReadersComplete(i2, arrayList2)) {
            System.out.printf("Not all readers are complete. Waiting...%n", new Object[0]);
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10L));
        }
        Assert.assertTrue("Readers did not catch up", areAllReadersComplete(i2, arrayList2));
        newFixedThreadPool.shutdownNow();
        arrayList.forEach(future -> {
            try {
                Throwable th = (Throwable) future.get();
                if (th != null) {
                    th.printStackTrace();
                }
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        System.out.println("Test complete");
    }

    private boolean areAllReadersComplete(int i, List<Reader> list) {
        boolean z = true;
        int i2 = 0;
        for (Reader reader : list) {
            i2++;
            if (reader.lastRead < i - 1) {
                z = false;
                System.out.printf("Reader #%d last read: %d%n", Integer.valueOf(i2), Integer.valueOf(reader.lastRead));
            }
        }
        return z;
    }
}
