package org.apache.spark.network;

import com.google.common.io.Files;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.StreamCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/spark/network/StreamSuite.class */
public class StreamSuite {
    private static final String[] STREAMS = StreamTestHelper.STREAMS;
    private static StreamTestHelper testData;
    private static TransportContext context;
    private static TransportServer server;
    private static TransportClientFactory clientFactory;

    /* loaded from: input_file:org/apache/spark/network/StreamSuite$StreamTask.class */
    private static class StreamTask implements Runnable {
        private final TransportClient client;
        private final String streamId;
        private final long timeoutMs;
        private Throwable error;

        StreamTask(TransportClient transportClient, String str, long j) {
            this.client = transportClient;
            this.streamId = str;
            this.timeoutMs = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            OutputStream outputStream;
            ByteBuffer duplicate;
            ByteBuffer byteBuffer = null;
            OutputStream outputStream2 = null;
            File file = null;
            try {
                try {
                    ByteArrayOutputStream byteArrayOutputStream = null;
                    String str = this.streamId;
                    boolean z = -1;
                    switch (str.hashCode()) {
                        case -2005425317:
                            if (str.equals("largeBuffer")) {
                                z = false;
                                break;
                            }
                            break;
                        case -1353714649:
                            if (str.equals("smallBuffer")) {
                                z = true;
                                break;
                            }
                            break;
                        case 3143036:
                            if (str.equals("file")) {
                                z = 2;
                                break;
                            }
                            break;
                        case 696981485:
                            if (str.equals("emptyBuffer")) {
                                z = 3;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            byteArrayOutputStream = new ByteArrayOutputStream();
                            outputStream = byteArrayOutputStream;
                            byteBuffer = StreamSuite.testData.largeBuffer;
                            break;
                        case true:
                            byteArrayOutputStream = new ByteArrayOutputStream();
                            outputStream = byteArrayOutputStream;
                            byteBuffer = StreamSuite.testData.smallBuffer;
                            break;
                        case true:
                            file = File.createTempFile("data", ".tmp", StreamSuite.testData.tempDir);
                            outputStream = new FileOutputStream(file);
                            break;
                        case true:
                            byteArrayOutputStream = new ByteArrayOutputStream();
                            outputStream = byteArrayOutputStream;
                            byteBuffer = StreamSuite.testData.emptyBuffer;
                            break;
                        default:
                            throw new IllegalArgumentException(this.streamId);
                    }
                    TestCallback testCallback = new TestCallback(outputStream);
                    this.client.stream(this.streamId, testCallback);
                    testCallback.waitForCompletion(this.timeoutMs);
                    if (byteBuffer == null) {
                        Assert.assertTrue("File stream did not match.", Files.equal(StreamSuite.testData.testFile, file));
                    } else {
                        synchronized (byteBuffer) {
                            duplicate = byteBuffer.duplicate();
                        }
                        byte[] byteArray = byteArrayOutputStream.toByteArray();
                        byte[] bArr = new byte[duplicate.remaining()];
                        duplicate.get(bArr);
                        Assert.assertEquals(bArr.length, byteArray.length);
                        Assert.assertArrayEquals("buffers don't match", bArr, byteArray);
                    }
                    if (outputStream != null) {
                        try {
                            outputStream.close();
                        } catch (Exception e) {
                        }
                    }
                    if (file != null) {
                        file.delete();
                    }
                } catch (Throwable th) {
                    if (0 != 0) {
                        try {
                            outputStream2.close();
                        } catch (Exception e2) {
                        }
                    }
                    if (0 != 0) {
                        file.delete();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                this.error = th2;
                if (0 != 0) {
                    try {
                        outputStream2.close();
                    } catch (Exception e3) {
                    }
                }
                if (0 != 0) {
                    file.delete();
                }
            }
        }

        public void check() throws Throwable {
            if (this.error != null) {
                throw this.error;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/spark/network/StreamSuite$TestCallback.class */
    public static class TestCallback implements StreamCallback {
        private final OutputStream out;
        public volatile boolean completed = false;
        public volatile Throwable error;

        /* JADX INFO: Access modifiers changed from: package-private */
        public TestCallback(OutputStream outputStream) {
            this.out = outputStream;
        }

        public void onData(String str, ByteBuffer byteBuffer) throws IOException {
            byte[] bArr = new byte[byteBuffer.remaining()];
            byteBuffer.get(bArr);
            this.out.write(bArr);
        }

        public void onComplete(String str) throws IOException {
            this.out.close();
            synchronized (this) {
                this.completed = true;
                notifyAll();
            }
        }

        public void onFailure(String str, Throwable th) {
            this.error = th;
            synchronized (this) {
                this.completed = true;
                notifyAll();
            }
        }

        void waitForCompletion(long j) {
            long currentTimeMillis = System.currentTimeMillis();
            long j2 = currentTimeMillis + j;
            synchronized (this) {
                while (!this.completed && currentTimeMillis < j2) {
                    try {
                        wait(j2 - currentTimeMillis);
                        currentTimeMillis = System.currentTimeMillis();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
            Assert.assertTrue("Timed out waiting for stream.", this.completed);
            Assert.assertNull(this.error);
        }
    }

    private static ByteBuffer createBuffer(int i) {
        ByteBuffer allocate = ByteBuffer.allocate(i);
        for (int i2 = 0; i2 < i; i2++) {
            allocate.put((byte) i2);
        }
        allocate.flip();
        return allocate;
    }

    @BeforeClass
    public static void setUp() throws Exception {
        testData = new StreamTestHelper();
        final TransportConf transportConf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
        final StreamManager streamManager = new StreamManager() { // from class: org.apache.spark.network.StreamSuite.1
            public ManagedBuffer getChunk(long j, int i) {
                throw new UnsupportedOperationException();
            }

            public ManagedBuffer openStream(String str) {
                return StreamSuite.testData.openStream(transportConf, str);
            }
        };
        context = new TransportContext(transportConf, new RpcHandler() { // from class: org.apache.spark.network.StreamSuite.2
            public void receive(TransportClient transportClient, ByteBuffer byteBuffer, RpcResponseCallback rpcResponseCallback) {
                throw new UnsupportedOperationException();
            }

            public StreamManager getStreamManager() {
                return streamManager;
            }
        });
        server = context.createServer();
        clientFactory = context.createClientFactory();
    }

    @AfterClass
    public static void tearDown() {
        server.close();
        clientFactory.close();
        testData.cleanup();
        context.close();
    }

    @Test
    public void testZeroLengthStream() throws Throwable {
        TransportClient createClient = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
        Throwable th = null;
        try {
            StreamTask streamTask = new StreamTask(createClient, "emptyBuffer", TimeUnit.SECONDS.toMillis(5L));
            streamTask.run();
            streamTask.check();
            if (createClient != null) {
                if (0 == 0) {
                    createClient.close();
                    return;
                }
                try {
                    createClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createClient != null) {
                if (0 != 0) {
                    try {
                        createClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testSingleStream() throws Throwable {
        TransportClient createClient = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
        Throwable th = null;
        try {
            StreamTask streamTask = new StreamTask(createClient, "largeBuffer", TimeUnit.SECONDS.toMillis(5L));
            streamTask.run();
            streamTask.check();
            if (createClient != null) {
                if (0 == 0) {
                    createClient.close();
                    return;
                }
                try {
                    createClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createClient != null) {
                if (0 != 0) {
                    try {
                        createClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testMultipleStreams() throws Throwable {
        TransportClient createClient = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
        Throwable th = null;
        try {
            for (int i = 0; i < 20; i++) {
                StreamTask streamTask = new StreamTask(createClient, STREAMS[i % STREAMS.length], TimeUnit.SECONDS.toMillis(5L));
                streamTask.run();
                streamTask.check();
            }
            if (createClient != null) {
                if (0 == 0) {
                    createClient.close();
                    return;
                }
                try {
                    createClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createClient != null) {
                if (0 != 0) {
                    try {
                        createClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testConcurrentStreams() throws Throwable {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(20);
        try {
            TransportClient createClient = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
            Throwable th = null;
            try {
                try {
                    ArrayList arrayList = new ArrayList();
                    for (int i = 0; i < 20; i++) {
                        StreamTask streamTask = new StreamTask(createClient, STREAMS[i % STREAMS.length], TimeUnit.SECONDS.toMillis(20L));
                        arrayList.add(streamTask);
                        newFixedThreadPool.submit(streamTask);
                    }
                    newFixedThreadPool.shutdown();
                    Assert.assertTrue("Timed out waiting for tasks.", newFixedThreadPool.awaitTermination(30L, TimeUnit.SECONDS));
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        ((StreamTask) it.next()).check();
                    }
                    if (createClient != null) {
                        if (0 != 0) {
                            try {
                                createClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createClient.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            newFixedThreadPool.shutdownNow();
        }
    }
}
