package org.glassfish.grizzly;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import junit.framework.Assert;
import org.apache.camel.management.DefaultManagementAgent;
import org.glassfish.grizzly.asyncqueue.AsyncQueueWriter;
import org.glassfish.grizzly.asyncqueue.AsyncWriteQueueRecord;
import org.glassfish.grizzly.asyncqueue.TaskQueue;
import org.glassfish.grizzly.filterchain.BaseFilter;
import org.glassfish.grizzly.filterchain.FilterChainBuilder;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
import org.glassfish.grizzly.filterchain.TransportFilter;
import org.glassfish.grizzly.impl.SafeFutureImpl;
import org.glassfish.grizzly.memory.Buffers;
import org.glassfish.grizzly.memory.MemoryManager;
import org.glassfish.grizzly.nio.NIOConnection;
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder;
import org.glassfish.grizzly.streams.StreamReader;
import org.glassfish.grizzly.utils.EchoFilter;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

/* loaded from: input_file:WEB-INF/lib/grizzly-framework-2.0.1-b2-tests.jar:org/glassfish/grizzly/AsyncWriteQueueTest.class */
public class AsyncWriteQueueTest extends GrizzlyTestCase {
    public static final int PORT = 7781;
    private static final Logger LOGGER = Grizzly.logger(AsyncWriteQueueTest.class);

    /* loaded from: input_file:WEB-INF/lib/grizzly-framework-2.0.1-b2-tests.jar:org/glassfish/grizzly/AsyncWriteQueueTest$WriteQueueFreeSpaceMonitor.class */
    private static class WriteQueueFreeSpaceMonitor extends TaskQueue.QueueMonitor {
        private final TaskQueue writeQueue;
        private final int freeSpaceAvailable;
        private final int maxSpace;
        private final Transport transport;
        private final Thread current = Thread.currentThread();

        public WriteQueueFreeSpaceMonitor(Connection connection, int i) {
            this.freeSpaceAvailable = i;
            this.writeQueue = ((NIOConnection) connection).getAsyncWriteQueue();
            this.transport = connection.getTransport();
            this.maxSpace = ((TCPNIOTransport) this.transport).getAsyncQueueIO().getWriter().getMaxPendingBytesPerConnection();
        }

        @Override // org.glassfish.grizzly.asyncqueue.TaskQueue.QueueMonitor
        public boolean shouldNotify() {
            return this.maxSpace - this.writeQueue.spaceInBytes() > this.freeSpaceAvailable;
        }

        @Override // org.glassfish.grizzly.asyncqueue.TaskQueue.QueueMonitor
        public void onNotify() {
            try {
                this.transport.pause();
            } catch (IOException e) {
                e.printStackTrace();
            }
            this.current.interrupt();
        }
    }

    /* JADX WARN: Finally extract failed */
    public void testAsyncWriteQueueEcho() throws Exception {
        final Connection connection = null;
        final AtomicInteger atomicInteger = new AtomicInteger();
        FilterChainBuilder stateless = FilterChainBuilder.stateless();
        stateless.add(new TransportFilter());
        stateless.add(new EchoFilter() { // from class: org.glassfish.grizzly.AsyncWriteQueueTest.1
            @Override // org.glassfish.grizzly.utils.EchoFilter, org.glassfish.grizzly.filterchain.BaseFilter, org.glassfish.grizzly.filterchain.Filter
            public NextAction handleRead(FilterChainContext filterChainContext) throws IOException {
                atomicInteger.addAndGet(((Buffer) filterChainContext.getMessage()).remaining());
                return super.handleRead(filterChainContext);
            }
        });
        TCPNIOTransport build = TCPNIOTransportBuilder.newInstance().build();
        build.setProcessor(stateless.build());
        try {
            build.bind(PORT);
            build.start();
            connection = build.connect(DefaultManagementAgent.DEFAULT_HOST, PORT).get(10L, TimeUnit.SECONDS);
            assertTrue(connection != null);
            connection.configureStandalone(true);
            StreamReader streamReader = ((StandaloneProcessor) connection.getProcessor()).getStreamReader(connection);
            final AsyncQueueWriter<SocketAddress> writer = build.getAsyncQueueIO().getWriter();
            final MemoryManager memoryManager = build.getMemoryManager();
            final CountDownLatch countDownLatch = new CountDownLatch(127);
            final EmptyCompletionHandler<WriteResult<Buffer, SocketAddress>> emptyCompletionHandler = new EmptyCompletionHandler<WriteResult<Buffer, SocketAddress>>() { // from class: org.glassfish.grizzly.AsyncWriteQueueTest.2
                @Override // org.glassfish.grizzly.EmptyCompletionHandler, org.glassfish.grizzly.CompletionHandler
                public void completed(WriteResult<Buffer, SocketAddress> writeResult) {
                    countDownLatch.countDown();
                }
            };
            ArrayList arrayList = new ArrayList(128);
            for (int i = 0; i < 127; i++) {
                final byte b = (byte) i;
                arrayList.add(new Callable<Object>() { // from class: org.glassfish.grizzly.AsyncWriteQueueTest.3
                    @Override // java.util.concurrent.Callable
                    public Object call() throws Exception {
                        byte[] bArr = new byte[128000];
                        Arrays.fill(bArr, b);
                        try {
                            writer.write(connection, Buffers.wrap(memoryManager, bArr), emptyCompletionHandler);
                            return null;
                        } catch (IOException e) {
                            Assert.assertTrue("IOException occurred", false);
                            return null;
                        }
                    }
                });
            }
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(12);
            try {
                newFixedThreadPool.invokeAll(arrayList);
                if (!countDownLatch.await(10L, TimeUnit.SECONDS)) {
                    assertTrue("Send timeout!", false);
                }
                newFixedThreadPool.shutdown();
                Integer num = null;
                try {
                    num = streamReader.notifyAvailable(16256000).get(10L, TimeUnit.SECONDS);
                } catch (Exception e) {
                    LOGGER.log(Level.WARNING, "read error", (Throwable) e);
                }
                assertTrue("Read timeout. Server received: " + atomicInteger.get() + " bytes. Expected: 16256000", num != null);
                byte[] bArr = new byte[16256000];
                streamReader.readByteArray(bArr);
                boolean[] zArr = new boolean[127];
                int i2 = 0;
                for (int i3 = 0; i3 < 127; i3++) {
                    byte b2 = bArr[i2];
                    assertEquals("Pattern: " + ((int) b2) + " was already used", false, zArr[b2]);
                    zArr[b2] = true;
                    for (int i4 = 0; i4 < 128000; i4++) {
                        int i5 = i2;
                        i2++;
                        byte b3 = bArr[i5];
                        assertEquals("Echo doesn't match. Offset: " + i2 + " pattern: " + ((int) b2) + " found: " + ((int) b3), b2, b3);
                    }
                }
                if (connection != null) {
                    connection.close();
                }
                build.stop();
            } catch (Throwable th) {
                newFixedThreadPool.shutdown();
                throw th;
            }
        } catch (Throwable th2) {
            if (connection != null) {
                connection.close();
            }
            build.stop();
            throw th2;
        }
    }

    public void testAsyncWriteQueueLimits() throws Exception {
        final Connection connection = null;
        FilterChainBuilder stateless = FilterChainBuilder.stateless();
        stateless.add(new TransportFilter());
        TCPNIOTransport build = TCPNIOTransportBuilder.newInstance().build();
        build.setProcessor(stateless.build());
        try {
            build.bind(PORT);
            build.start();
            connection = build.connect(DefaultManagementAgent.DEFAULT_HOST, PORT).get(10L, TimeUnit.SECONDS);
            assertTrue(connection != null);
            connection.configureStandalone(true);
            AsyncQueueWriter<SocketAddress> writer = build.getAsyncQueueIO().getWriter();
            writer.setMaxPendingBytesPerConnection(512001);
            MemoryManager memoryManager = build.getMemoryManager();
            final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            build.pause();
            int i = 0;
            int i2 = 0;
            final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
            final AtomicInteger atomicInteger = new AtomicInteger();
            while (!atomicBoolean.get() && i2 < 4) {
                final int i3 = i2;
                byte[] bArr = new byte[256000];
                Arrays.fill(bArr, (byte) i);
                Buffer wrap = Buffers.wrap(memoryManager, bArr);
                try {
                    if (writer.canWrite(connection, wrap.remaining())) {
                        writer.write(connection, wrap);
                    } else if (i2 == 3) {
                        writer.write(connection, wrap, new EmptyCompletionHandler<WriteResult<Buffer, SocketAddress>>() { // from class: org.glassfish.grizzly.AsyncWriteQueueTest.4
                            @Override // org.glassfish.grizzly.EmptyCompletionHandler, org.glassfish.grizzly.CompletionHandler
                            public void failed(Throwable th) {
                                if (th instanceof PendingWriteQueueLimitExceededException) {
                                    atomicBoolean2.compareAndSet(false, true);
                                    atomicInteger.set(i3);
                                    Assert.assertTrue(((NIOConnection) connection).getAsyncWriteQueue().spaceInBytes() + 256000 > 512001);
                                }
                                atomicBoolean.compareAndSet(false, true);
                            }
                        });
                    } else {
                        i2++;
                        build.resume();
                        Thread.sleep(DefaultMessageListenerContainer.DEFAULT_RECOVERY_INTERVAL);
                        build.pause();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    assertTrue("IOException occurred: " + e.toString(), false);
                }
                i++;
            }
            if (!atomicBoolean2.get()) {
                fail("No Exception thrown when queue write limit exceeded");
            }
            if (atomicInteger.get() != 3) {
                fail("Expected exception to occur at 4th iteration of test loop.  Occurred at: " + atomicInteger);
            }
            if (connection != null) {
                connection.close();
            }
            if (build.isPaused()) {
                build.resume();
            }
            build.stop();
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            if (build.isPaused()) {
                build.resume();
            }
            build.stop();
            throw th;
        }
    }

    public void testQueueNotification() throws Exception {
        Connection connection = null;
        FilterChainBuilder stateless = FilterChainBuilder.stateless();
        stateless.add(new TransportFilter());
        TCPNIOTransport build = TCPNIOTransportBuilder.newInstance().build();
        build.setProcessor(stateless.build());
        try {
            build.bind(PORT);
            build.start();
            connection = build.connect(DefaultManagementAgent.DEFAULT_HOST, PORT).get(10L, TimeUnit.SECONDS);
            assertTrue(connection != null);
            connection.configureStandalone(true);
            AsyncQueueWriter<SocketAddress> writer = build.getAsyncQueueIO().getWriter();
            writer.setMaxPendingBytesPerConnection(2560000);
            System.out.println("Max Space: " + writer.getMaxPendingBytesPerConnection());
            MemoryManager memoryManager = build.getMemoryManager();
            build.pause();
            TaskQueue<AsyncWriteQueueRecord> asyncWriteQueue = ((NIOConnection) connection).getAsyncWriteQueue();
            do {
                byte[] bArr = new byte[256000];
                Arrays.fill(bArr, (byte) 1);
                Buffer wrap = Buffers.wrap(memoryManager, bArr);
                try {
                    if (writer.canWrite(connection, 256000)) {
                        writer.write(connection, wrap);
                    }
                } catch (IOException e) {
                    assertTrue("IOException occurred: " + e.toString(), false);
                }
            } while (writer.canWrite(connection, 256000));
            asyncWriteQueue.addQueueMonitor(new WriteQueueFreeSpaceMonitor(connection, 1024000));
            build.resume();
            long j = 0;
            try {
                System.out.println("Waiting for free space notification.  Max wait time is 10000ms.");
                j = System.currentTimeMillis();
                Thread.sleep(10000L);
                fail("Thread not interrupted within 10 seconds.");
            } catch (InterruptedException e2) {
                System.out.println("Notified in " + (System.currentTimeMillis() - j) + "ms");
            }
            assertTrue(writer.getMaxPendingBytesPerConnection() - asyncWriteQueue.spaceInBytes() >= 1024000);
            System.out.println("Queue Space: " + asyncWriteQueue.spaceInBytes());
            if (connection != null) {
                connection.close();
            }
            if (build.isPaused()) {
                build.resume();
            }
            build.stop();
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            if (build.isPaused()) {
                build.resume();
            }
            build.stop();
            throw th;
        }
    }

    public void testAsyncWriteQueueReenterants() throws Exception {
        final Connection connection = null;
        final AtomicInteger atomicInteger = new AtomicInteger();
        FilterChainBuilder stateless = FilterChainBuilder.stateless();
        stateless.add(new TransportFilter());
        stateless.add(new BaseFilter() { // from class: org.glassfish.grizzly.AsyncWriteQueueTest.5
            @Override // org.glassfish.grizzly.filterchain.BaseFilter, org.glassfish.grizzly.filterchain.Filter
            public NextAction handleRead(FilterChainContext filterChainContext) throws IOException {
                atomicInteger.addAndGet(((Buffer) filterChainContext.getMessage()).remaining());
                return filterChainContext.getStopAction();
            }
        });
        TCPNIOTransport build = TCPNIOTransportBuilder.newInstance().build();
        build.setProcessor(stateless.build());
        try {
            build.bind(PORT);
            build.start();
            connection = build.connect(DefaultManagementAgent.DEFAULT_HOST, PORT).get(10L, TimeUnit.SECONDS);
            assertTrue(connection != null);
            connection.configureStandalone(true);
            final AsyncQueueWriter<SocketAddress> writer = build.getAsyncQueueIO().getWriter();
            final MemoryManager memoryManager = build.getMemoryManager();
            writer.setMaxWriteReenterants(10);
            final AtomicInteger atomicInteger2 = new AtomicInteger();
            final SafeFutureImpl create = SafeFutureImpl.create();
            final ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
            Thread currentThread = Thread.currentThread();
            concurrentLinkedQueue.add(currentThread);
            writer.write(connection, Buffers.wrap(memoryManager, "" + ((char) (65 + atomicInteger2.getAndIncrement()))), new EmptyCompletionHandler<WriteResult<Buffer, SocketAddress>>() { // from class: org.glassfish.grizzly.AsyncWriteQueueTest.6
                @Override // org.glassfish.grizzly.EmptyCompletionHandler, org.glassfish.grizzly.CompletionHandler
                public void completed(WriteResult<Buffer, SocketAddress> writeResult) {
                    int incrementAndGet = atomicInteger2.incrementAndGet();
                    if (incrementAndGet > 11) {
                        create.result(Boolean.TRUE);
                        return;
                    }
                    concurrentLinkedQueue.add(Thread.currentThread());
                    try {
                        writer.write(connection, Buffers.wrap(memoryManager, "" + ((char) (65 + incrementAndGet))), this);
                    } catch (IOException e) {
                        create.failure(e);
                    }
                }
            });
            assertTrue(((Boolean) create.get(10L, TimeUnit.SECONDS)).booleanValue());
            while (!concurrentLinkedQueue.isEmpty()) {
                Thread thread = (Thread) concurrentLinkedQueue.poll();
                if (concurrentLinkedQueue.isEmpty()) {
                    assertNotSame(currentThread, thread);
                } else {
                    assertSame(currentThread, thread);
                }
            }
            if (connection != null) {
                connection.close();
            }
            build.stop();
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            build.stop();
            throw th;
        }
    }
}
