/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network.shuffle;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.spark.network.TestUtils;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.TestShuffleDataContext;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.ConfigProvider;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class ExternalShuffleIntegrationSuite {
    private static final String APP_ID = "app-id";
    private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
    static TestShuffleDataContext dataContext0;
    static ExternalShuffleBlockHandler handler;
    static TransportServer server;
    static TransportConf conf;
    static byte[][] exec0Blocks;
    static byte[][] exec1Blocks;

    @BeforeClass
    public static void beforeAll() throws IOException {
        Random rand = new Random();
        for (byte[] block : exec0Blocks) {
            rand.nextBytes(block);
        }
        for (byte[] block : exec1Blocks) {
            rand.nextBytes(block);
        }
        dataContext0 = new TestShuffleDataContext(2, 5);
        dataContext0.create();
        dataContext0.insertSortShuffleData(0, 0, exec0Blocks);
        conf = new TransportConf("shuffle", (ConfigProvider)MapConfigProvider.EMPTY);
        handler = new ExternalShuffleBlockHandler(conf, null);
        TransportContext transportContext = new TransportContext(conf, (RpcHandler)handler);
        server = transportContext.createServer();
    }

    @AfterClass
    public static void afterAll() {
        dataContext0.cleanup();
        server.close();
    }

    @After
    public void afterEach() {
        handler.applicationRemoved(APP_ID, false);
    }

    private FetchResult fetchBlocks(String execId, String[] blockIds) throws Exception {
        return this.fetchBlocks(execId, blockIds, conf, server.getPort());
    }

    private FetchResult fetchBlocks(String execId, String[] blockIds, TransportConf clientConf, int port) throws Exception {
        final FetchResult res = new FetchResult();
        res.successBlocks = Collections.synchronizedSet(new HashSet());
        res.failedBlocks = Collections.synchronizedSet(new HashSet());
        res.buffers = Collections.synchronizedList(new LinkedList());
        final Semaphore requestsRemaining = new Semaphore(0);
        ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false);
        client.init(APP_ID);
        client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, new BlockFetchingListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
                1 var3_3 = this;
                synchronized (var3_3) {
                    if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
                        data.retain();
                        res.successBlocks.add(blockId);
                        res.buffers.add(data);
                        requestsRemaining.release();
                    }
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onBlockFetchFailure(String blockId, Throwable exception) {
                1 var3_3 = this;
                synchronized (var3_3) {
                    if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
                        res.failedBlocks.add(blockId);
                        requestsRemaining.release();
                    }
                }
            }
        }, null);
        if (!requestsRemaining.tryAcquire(blockIds.length, 5L, TimeUnit.SECONDS)) {
            Assert.fail((String)"Timeout getting response from the server");
        }
        client.close();
        return res;
    }

    @Test
    public void testFetchOneSort() throws Exception {
        ExternalShuffleIntegrationSuite.registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult exec0Fetch = this.fetchBlocks("exec-0", new String[]{"shuffle_0_0_0"});
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new String[]{"shuffle_0_0_0"}), exec0Fetch.successBlocks);
        Assert.assertTrue((boolean)exec0Fetch.failedBlocks.isEmpty());
        ExternalShuffleIntegrationSuite.assertBufferListsEqual(exec0Fetch.buffers, Arrays.asList(new byte[][]{exec0Blocks[0]}));
        exec0Fetch.releaseBuffers();
    }

    @Test
    public void testFetchThreeSort() throws Exception {
        ExternalShuffleIntegrationSuite.registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult exec0Fetch = this.fetchBlocks("exec-0", new String[]{"shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2"});
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new String[]{"shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2"}), exec0Fetch.successBlocks);
        Assert.assertTrue((boolean)exec0Fetch.failedBlocks.isEmpty());
        ExternalShuffleIntegrationSuite.assertBufferListsEqual(exec0Fetch.buffers, Arrays.asList(exec0Blocks));
        exec0Fetch.releaseBuffers();
    }

    @Test(expected=RuntimeException.class)
    public void testRegisterInvalidExecutor() throws Exception {
        ExternalShuffleIntegrationSuite.registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager"));
    }

    @Test
    public void testFetchWrongBlockId() throws Exception {
        ExternalShuffleIntegrationSuite.registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult execFetch = this.fetchBlocks("exec-1", new String[]{"rdd_1_0_0"});
        Assert.assertTrue((boolean)execFetch.successBlocks.isEmpty());
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new String[]{"rdd_1_0_0"}), execFetch.failedBlocks);
    }

    @Test
    public void testFetchNonexistent() throws Exception {
        ExternalShuffleIntegrationSuite.registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult execFetch = this.fetchBlocks("exec-0", new String[]{"shuffle_2_0_0"});
        Assert.assertTrue((boolean)execFetch.successBlocks.isEmpty());
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new String[]{"shuffle_2_0_0"}), execFetch.failedBlocks);
    }

    @Test
    public void testFetchWrongExecutor() throws Exception {
        ExternalShuffleIntegrationSuite.registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult execFetch = this.fetchBlocks("exec-0", new String[]{"shuffle_0_0_0", "shuffle_1_0_0"});
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new String[]{"shuffle_0_0_0"}), execFetch.successBlocks);
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new String[]{"shuffle_1_0_0"}), execFetch.failedBlocks);
    }

    @Test
    public void testFetchUnregisteredExecutor() throws Exception {
        ExternalShuffleIntegrationSuite.registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult execFetch = this.fetchBlocks("exec-2", new String[]{"shuffle_0_0_0", "shuffle_1_0_0"});
        Assert.assertTrue((boolean)execFetch.successBlocks.isEmpty());
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new String[]{"shuffle_0_0_0", "shuffle_1_0_0"}), execFetch.failedBlocks);
    }

    @Test
    public void testFetchNoServer() throws Exception {
        TransportConf clientConf = new TransportConf("shuffle", (ConfigProvider)new MapConfigProvider((Map)ImmutableMap.of((Object)"spark.shuffle.io.maxRetries", (Object)"0")));
        ExternalShuffleIntegrationSuite.registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult execFetch = this.fetchBlocks("exec-0", new String[]{"shuffle_1_0_0", "shuffle_1_0_1"}, clientConf, 1);
        Assert.assertTrue((boolean)execFetch.successBlocks.isEmpty());
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new String[]{"shuffle_1_0_0", "shuffle_1_0_1"}), execFetch.failedBlocks);
    }

    private static void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException {
        ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false);
        client.init(APP_ID);
        client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), executorId, executorInfo);
    }

    private static void assertBufferListsEqual(List<ManagedBuffer> list0, List<byte[]> list1) throws Exception {
        Assert.assertEquals((long)list0.size(), (long)list1.size());
        for (int i = 0; i < list0.size(); ++i) {
            ExternalShuffleIntegrationSuite.assertBuffersEqual(list0.get(i), (ManagedBuffer)new NioManagedBuffer(ByteBuffer.wrap(list1.get(i))));
        }
    }

    private static void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer buffer1) throws Exception {
        ByteBuffer nio0 = buffer0.nioByteBuffer();
        ByteBuffer nio1 = buffer1.nioByteBuffer();
        int len = nio0.remaining();
        Assert.assertEquals((long)nio0.remaining(), (long)nio1.remaining());
        for (int i = 0; i < len; ++i) {
            Assert.assertEquals((long)nio0.get(), (long)nio1.get());
        }
    }

    static {
        exec0Blocks = new byte[][]{new byte[123], new byte[12345], new byte[1234567]};
        exec1Blocks = new byte[][]{new byte[321], new byte[54321]};
    }

    static class FetchResult {
        public Set<String> successBlocks;
        public Set<String> failedBlocks;
        public List<ManagedBuffer> buffers;

        FetchResult() {
        }

        public void releaseBuffers() {
            for (ManagedBuffer buffer : this.buffers) {
                buffer.release();
            }
        }
    }
}

