package org.apache.spark.network.shuffle;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.spark.network.TestUtils;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.class */
public class ExternalShuffleIntegrationSuite {
    private static final String APP_ID = "app-id";
    private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
    static TestShuffleDataContext dataContext0;
    static ExternalShuffleBlockHandler handler;
    static TransportServer server;
    static TransportConf conf;
    static byte[][] exec0Blocks = {new byte[123], new byte[12345], new byte[1234567]};
    static byte[][] exec1Blocks = {new byte[321], new byte[54321]};

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite$FetchResult.class */
    public static class FetchResult {
        public Set<String> successBlocks;
        public Set<String> failedBlocks;
        public List<ManagedBuffer> buffers;

        FetchResult() {
        }

        public void releaseBuffers() {
            Iterator<ManagedBuffer> it = this.buffers.iterator();
            while (it.hasNext()) {
                it.next().release();
            }
        }
    }

    @BeforeClass
    public static void beforeAll() throws IOException {
        Random random = new Random();
        for (byte[] bArr : exec0Blocks) {
            random.nextBytes(bArr);
        }
        for (byte[] bArr2 : exec1Blocks) {
            random.nextBytes(bArr2);
        }
        dataContext0 = new TestShuffleDataContext(2, 5);
        dataContext0.create();
        dataContext0.insertSortShuffleData(0, 0, exec0Blocks);
        conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
        handler = new ExternalShuffleBlockHandler(conf, (File) null);
        server = new TransportContext(conf, handler).createServer();
    }

    @AfterClass
    public static void afterAll() {
        dataContext0.cleanup();
        server.close();
    }

    @After
    public void afterEach() {
        handler.applicationRemoved(APP_ID, false);
    }

    private FetchResult fetchBlocks(String str, String[] strArr) throws Exception {
        return fetchBlocks(str, strArr, server.getPort());
    }

    private FetchResult fetchBlocks(String str, String[] strArr, int i) throws Exception {
        final FetchResult fetchResult = new FetchResult();
        fetchResult.successBlocks = Collections.synchronizedSet(new HashSet());
        fetchResult.failedBlocks = Collections.synchronizedSet(new HashSet());
        fetchResult.buffers = Collections.synchronizedList(new LinkedList());
        final Semaphore semaphore = new Semaphore(0);
        ExternalShuffleClient externalShuffleClient = new ExternalShuffleClient(conf, (SecretKeyHolder) null, false, false);
        externalShuffleClient.init(APP_ID);
        externalShuffleClient.fetchBlocks(TestUtils.getLocalHost(), i, str, strArr, new BlockFetchingListener() { // from class: org.apache.spark.network.shuffle.ExternalShuffleIntegrationSuite.1
            public void onBlockFetchSuccess(String str2, ManagedBuffer managedBuffer) {
                synchronized (this) {
                    if (!fetchResult.successBlocks.contains(str2) && !fetchResult.failedBlocks.contains(str2)) {
                        managedBuffer.retain();
                        fetchResult.successBlocks.add(str2);
                        fetchResult.buffers.add(managedBuffer);
                        semaphore.release();
                    }
                }
            }

            public void onBlockFetchFailure(String str2, Throwable th) {
                synchronized (this) {
                    if (!fetchResult.successBlocks.contains(str2) && !fetchResult.failedBlocks.contains(str2)) {
                        fetchResult.failedBlocks.add(str2);
                        semaphore.release();
                    }
                }
            }
        });
        if (!semaphore.tryAcquire(strArr.length, 5L, TimeUnit.SECONDS)) {
            Assert.fail("Timeout getting response from the server");
        }
        externalShuffleClient.close();
        return fetchResult;
    }

    /* JADX WARN: Type inference failed for: r2v6, types: [byte[], java.lang.Object[]] */
    @Test
    public void testFetchOneSort() throws Exception {
        registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult fetchBlocks = fetchBlocks("exec-0", new String[]{"shuffle_0_0_0"});
        Assert.assertEquals(Sets.newHashSet(new String[]{"shuffle_0_0_0"}), fetchBlocks.successBlocks);
        Assert.assertTrue(fetchBlocks.failedBlocks.isEmpty());
        assertBufferListsEqual(fetchBlocks.buffers, Lists.newArrayList((Object[]) new byte[]{exec0Blocks[0]}));
        fetchBlocks.releaseBuffers();
    }

    @Test
    public void testFetchThreeSort() throws Exception {
        registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult fetchBlocks = fetchBlocks("exec-0", new String[]{"shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2"});
        Assert.assertEquals(Sets.newHashSet(new String[]{"shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2"}), fetchBlocks.successBlocks);
        Assert.assertTrue(fetchBlocks.failedBlocks.isEmpty());
        assertBufferListsEqual(fetchBlocks.buffers, Lists.newArrayList(exec0Blocks));
        fetchBlocks.releaseBuffers();
    }

    @Test(expected = RuntimeException.class)
    public void testRegisterInvalidExecutor() throws Exception {
        registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager"));
    }

    @Test
    public void testFetchWrongBlockId() throws Exception {
        registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult fetchBlocks = fetchBlocks("exec-1", new String[]{"rdd_1_0_0"});
        Assert.assertTrue(fetchBlocks.successBlocks.isEmpty());
        Assert.assertEquals(Sets.newHashSet(new String[]{"rdd_1_0_0"}), fetchBlocks.failedBlocks);
    }

    @Test
    public void testFetchNonexistent() throws Exception {
        registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult fetchBlocks = fetchBlocks("exec-0", new String[]{"shuffle_2_0_0"});
        Assert.assertTrue(fetchBlocks.successBlocks.isEmpty());
        Assert.assertEquals(Sets.newHashSet(new String[]{"shuffle_2_0_0"}), fetchBlocks.failedBlocks);
    }

    @Test
    public void testFetchWrongExecutor() throws Exception {
        registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult fetchBlocks = fetchBlocks("exec-0", new String[]{"shuffle_0_0_0", "shuffle_1_0_0"});
        Assert.assertTrue(fetchBlocks.successBlocks.isEmpty());
        Assert.assertEquals(Sets.newHashSet(new String[]{"shuffle_0_0_0", "shuffle_1_0_0"}), fetchBlocks.failedBlocks);
    }

    @Test
    public void testFetchUnregisteredExecutor() throws Exception {
        registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
        FetchResult fetchBlocks = fetchBlocks("exec-2", new String[]{"shuffle_0_0_0", "shuffle_1_0_0"});
        Assert.assertTrue(fetchBlocks.successBlocks.isEmpty());
        Assert.assertEquals(Sets.newHashSet(new String[]{"shuffle_0_0_0", "shuffle_1_0_0"}), fetchBlocks.failedBlocks);
    }

    @Test
    public void testFetchNoServer() throws Exception {
        System.setProperty("spark.shuffle.io.maxRetries", "0");
        try {
            registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
            FetchResult fetchBlocks = fetchBlocks("exec-0", new String[]{"shuffle_1_0_0", "shuffle_1_0_1"}, 1);
            Assert.assertTrue(fetchBlocks.successBlocks.isEmpty());
            Assert.assertEquals(Sets.newHashSet(new String[]{"shuffle_1_0_0", "shuffle_1_0_1"}), fetchBlocks.failedBlocks);
            System.clearProperty("spark.shuffle.io.maxRetries");
        } catch (Throwable th) {
            System.clearProperty("spark.shuffle.io.maxRetries");
            throw th;
        }
    }

    private void registerExecutor(String str, ExecutorShuffleInfo executorShuffleInfo) throws IOException, InterruptedException {
        ExternalShuffleClient externalShuffleClient = new ExternalShuffleClient(conf, (SecretKeyHolder) null, false, false);
        externalShuffleClient.init(APP_ID);
        externalShuffleClient.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), str, executorShuffleInfo);
    }

    private void assertBufferListsEqual(List<ManagedBuffer> list, List<byte[]> list2) throws Exception {
        Assert.assertEquals(list.size(), list2.size());
        for (int i = 0; i < list.size(); i++) {
            assertBuffersEqual(list.get(i), new NioManagedBuffer(ByteBuffer.wrap(list2.get(i))));
        }
    }

    private void assertBuffersEqual(ManagedBuffer managedBuffer, ManagedBuffer managedBuffer2) throws Exception {
        ByteBuffer nioByteBuffer = managedBuffer.nioByteBuffer();
        ByteBuffer nioByteBuffer2 = managedBuffer2.nioByteBuffer();
        int remaining = nioByteBuffer.remaining();
        Assert.assertEquals(nioByteBuffer.remaining(), nioByteBuffer2.remaining());
        for (int i = 0; i < remaining; i++) {
            Assert.assertEquals(nioByteBuffer.get(), nioByteBuffer2.get());
        }
    }
}
