/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network;

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.spark.network.TestUtils;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.ChunkReceivedCallback;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.util.ConfigProvider;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class RequestTimeoutIntegrationSuite {
    private TransportServer server;
    private TransportClientFactory clientFactory;
    private StreamManager defaultManager;
    private TransportConf conf;
    private static final int FOREVER = 60000;

    @Before
    public void setUp() throws Exception {
        HashMap configMap = Maps.newHashMap();
        configMap.put("spark.shuffle.io.connectionTimeout", "10s");
        this.conf = new TransportConf("shuffle", (ConfigProvider)new MapConfigProvider((Map)configMap));
        this.defaultManager = new StreamManager(){

            public ManagedBuffer getChunk(long streamId, int chunkIndex) {
                throw new UnsupportedOperationException();
            }
        };
    }

    @After
    public void tearDown() {
        if (this.server != null) {
            this.server.close();
        }
        if (this.clientFactory != null) {
            this.clientFactory.close();
        }
    }

    @Test
    public void timeoutInactiveRequests() throws Exception {
        final Semaphore semaphore = new Semaphore(1);
        int responseSize = 16;
        RpcHandler handler = new RpcHandler(){

            public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
                try {
                    semaphore.acquire();
                    callback.onSuccess(ByteBuffer.allocate(16));
                }
                catch (InterruptedException e) {
                    // empty catch block
                }
            }

            public StreamManager getStreamManager() {
                return RequestTimeoutIntegrationSuite.this.defaultManager;
            }
        };
        TransportContext context = new TransportContext(this.conf, handler);
        this.server = context.createServer();
        this.clientFactory = context.createClientFactory();
        TransportClient client = this.clientFactory.createClient(TestUtils.getLocalHost(), this.server.getPort());
        TestCallback callback0 = new TestCallback();
        client.sendRpc(ByteBuffer.allocate(0), (RpcResponseCallback)callback0);
        callback0.latch.await();
        Assert.assertEquals((long)16L, (long)callback0.successLength);
        TestCallback callback1 = new TestCallback();
        client.sendRpc(ByteBuffer.allocate(0), (RpcResponseCallback)callback1);
        callback1.latch.await(60L, TimeUnit.SECONDS);
        Assert.assertNotNull((Object)callback1.failure);
        Assert.assertTrue((boolean)(callback1.failure instanceof IOException));
        semaphore.release();
    }

    @Test
    public void timeoutCleanlyClosesClient() throws Exception {
        final Semaphore semaphore = new Semaphore(0);
        int responseSize = 16;
        RpcHandler handler = new RpcHandler(){

            public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
                try {
                    semaphore.acquire();
                    callback.onSuccess(ByteBuffer.allocate(16));
                }
                catch (InterruptedException e) {
                    // empty catch block
                }
            }

            public StreamManager getStreamManager() {
                return RequestTimeoutIntegrationSuite.this.defaultManager;
            }
        };
        TransportContext context = new TransportContext(this.conf, handler);
        this.server = context.createServer();
        this.clientFactory = context.createClientFactory();
        TransportClient client0 = this.clientFactory.createClient(TestUtils.getLocalHost(), this.server.getPort());
        TestCallback callback0 = new TestCallback();
        client0.sendRpc(ByteBuffer.allocate(0), (RpcResponseCallback)callback0);
        callback0.latch.await();
        Assert.assertTrue((boolean)(callback0.failure instanceof IOException));
        Assert.assertFalse((boolean)client0.isActive());
        semaphore.release(2);
        TransportClient client1 = this.clientFactory.createClient(TestUtils.getLocalHost(), this.server.getPort());
        TestCallback callback1 = new TestCallback();
        client1.sendRpc(ByteBuffer.allocate(0), (RpcResponseCallback)callback1);
        callback1.latch.await();
        Assert.assertEquals((long)16L, (long)callback1.successLength);
        Assert.assertNull((Object)callback1.failure);
    }

    @Test
    public void furtherRequestsDelay() throws Exception {
        final byte[] response = new byte[16];
        final StreamManager manager = new StreamManager(){

            public ManagedBuffer getChunk(long streamId, int chunkIndex) {
                Uninterruptibles.sleepUninterruptibly((long)60000L, (TimeUnit)TimeUnit.MILLISECONDS);
                return new NioManagedBuffer(ByteBuffer.wrap(response));
            }
        };
        RpcHandler handler = new RpcHandler(){

            public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
                throw new UnsupportedOperationException();
            }

            public StreamManager getStreamManager() {
                return manager;
            }
        };
        TransportContext context = new TransportContext(this.conf, handler);
        this.server = context.createServer();
        this.clientFactory = context.createClientFactory();
        TransportClient client = this.clientFactory.createClient(TestUtils.getLocalHost(), this.server.getPort());
        TestCallback callback0 = new TestCallback();
        client.fetchChunk(0L, 0, (ChunkReceivedCallback)callback0);
        Uninterruptibles.sleepUninterruptibly((long)1200L, (TimeUnit)TimeUnit.MILLISECONDS);
        TestCallback callback1 = new TestCallback();
        client.fetchChunk(0L, 1, (ChunkReceivedCallback)callback1);
        Uninterruptibles.sleepUninterruptibly((long)1200L, (TimeUnit)TimeUnit.MILLISECONDS);
        Assert.assertEquals((long)-1L, (long)callback0.successLength);
        Assert.assertNull((Object)callback0.failure);
        callback0.latch.await(60L, TimeUnit.SECONDS);
        Assert.assertTrue((boolean)(callback0.failure instanceof IOException));
        callback1.latch.await(60L, TimeUnit.SECONDS);
        Assert.assertTrue((boolean)(callback1.failure instanceof IOException));
    }

    static class TestCallback
    implements RpcResponseCallback,
    ChunkReceivedCallback {
        int successLength = -1;
        Throwable failure;
        final CountDownLatch latch = new CountDownLatch(1);

        TestCallback() {
        }

        public void onSuccess(ByteBuffer response) {
            this.successLength = response.remaining();
            this.latch.countDown();
        }

        public void onFailure(Throwable e) {
            this.failure = e;
            this.latch.countDown();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
            try {
                this.successLength = buffer.nioByteBuffer().remaining();
            }
            catch (IOException iOException) {
            }
            finally {
                this.latch.countDown();
            }
        }

        public void onFailure(int chunkIndex, Throwable e) {
            this.failure = e;
            this.latch.countDown();
        }
    }
}

