/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network;

import com.google.common.io.Files;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.spark.network.TestUtils;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.StreamCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.util.ConfigProvider;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class StreamSuite {
    private static final String[] STREAMS = new String[]{"largeBuffer", "smallBuffer", "emptyBuffer", "file"};
    private static TransportServer server;
    private static TransportClientFactory clientFactory;
    private static File testFile;
    private static File tempDir;
    private static ByteBuffer emptyBuffer;
    private static ByteBuffer smallBuffer;
    private static ByteBuffer largeBuffer;

    private static ByteBuffer createBuffer(int bufSize) {
        ByteBuffer buf = ByteBuffer.allocate(bufSize);
        for (int i = 0; i < bufSize; ++i) {
            buf.put((byte)i);
        }
        buf.flip();
        return buf;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @BeforeClass
    public static void setUp() throws Exception {
        tempDir = Files.createTempDir();
        emptyBuffer = StreamSuite.createBuffer(0);
        smallBuffer = StreamSuite.createBuffer(100);
        largeBuffer = StreamSuite.createBuffer(100000);
        testFile = File.createTempFile("stream-test-file", "txt", tempDir);
        try (FileOutputStream fp = new FileOutputStream(testFile);){
            Random rnd = new Random();
            for (int i = 0; i < 512; ++i) {
                byte[] fileContent = new byte[1024];
                rnd.nextBytes(fileContent);
                fp.write(fileContent);
            }
        }
        final TransportConf conf = new TransportConf("shuffle", (ConfigProvider)new SystemPropertyConfigProvider());
        final StreamManager streamManager = new StreamManager(){

            public ManagedBuffer getChunk(long streamId, int chunkIndex) {
                throw new UnsupportedOperationException();
            }

            public ManagedBuffer openStream(String streamId) {
                switch (streamId) {
                    case "largeBuffer": {
                        return new NioManagedBuffer(largeBuffer);
                    }
                    case "smallBuffer": {
                        return new NioManagedBuffer(smallBuffer);
                    }
                    case "emptyBuffer": {
                        return new NioManagedBuffer(emptyBuffer);
                    }
                    case "file": {
                        return new FileSegmentManagedBuffer(conf, testFile, 0L, testFile.length());
                    }
                }
                throw new IllegalArgumentException("Invalid stream: " + streamId);
            }
        };
        RpcHandler handler = new RpcHandler(){

            public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
                throw new UnsupportedOperationException();
            }

            public StreamManager getStreamManager() {
                return streamManager;
            }
        };
        TransportContext context = new TransportContext(conf, handler);
        server = context.createServer();
        clientFactory = context.createClientFactory();
    }

    @AfterClass
    public static void tearDown() {
        server.close();
        clientFactory.close();
        if (tempDir != null) {
            for (File f : tempDir.listFiles()) {
                f.delete();
            }
            tempDir.delete();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testZeroLengthStream() throws Throwable {
        try (TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());){
            StreamTask task = new StreamTask(client, "emptyBuffer", TimeUnit.SECONDS.toMillis(5L));
            task.run();
            task.check();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSingleStream() throws Throwable {
        try (TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());){
            StreamTask task = new StreamTask(client, "largeBuffer", TimeUnit.SECONDS.toMillis(5L));
            task.run();
            task.check();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultipleStreams() throws Throwable {
        try (TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());){
            for (int i = 0; i < 20; ++i) {
                StreamTask task = new StreamTask(client, STREAMS[i % STREAMS.length], TimeUnit.SECONDS.toMillis(5L));
                task.run();
                task.check();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentStreams() throws Throwable {
        ExecutorService executor = Executors.newFixedThreadPool(20);
        TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
        try {
            ArrayList<StreamTask> tasks = new ArrayList<StreamTask>();
            for (int i = 0; i < 20; ++i) {
                StreamTask task = new StreamTask(client, STREAMS[i % STREAMS.length], TimeUnit.SECONDS.toMillis(20L));
                tasks.add(task);
                executor.submit(task);
            }
            executor.shutdown();
            Assert.assertTrue((String)"Timed out waiting for tasks.", (boolean)executor.awaitTermination(30L, TimeUnit.SECONDS));
            for (StreamTask task : tasks) {
                task.check();
            }
        }
        finally {
            executor.shutdownNow();
            client.close();
        }
    }

    private static class TestCallback
    implements StreamCallback {
        private final OutputStream out;
        public volatile boolean completed;
        public volatile Throwable error;

        TestCallback(OutputStream out) {
            this.out = out;
            this.completed = false;
        }

        public void onData(String streamId, ByteBuffer buf) throws IOException {
            byte[] tmp = new byte[buf.remaining()];
            buf.get(tmp);
            this.out.write(tmp);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onComplete(String streamId) throws IOException {
            this.out.close();
            TestCallback testCallback = this;
            synchronized (testCallback) {
                this.completed = true;
                this.notifyAll();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onFailure(String streamId, Throwable cause) {
            this.error = cause;
            TestCallback testCallback = this;
            synchronized (testCallback) {
                this.completed = true;
                this.notifyAll();
            }
        }
    }

    private static class StreamTask
    implements Runnable {
        private final TransportClient client;
        private final String streamId;
        private final long timeoutMs;
        private Throwable error;

        StreamTask(TransportClient client, String streamId, long timeoutMs) {
            this.client = client;
            this.streamId = streamId;
            this.timeoutMs = timeoutMs;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block29: {
                ByteBuffer srcBuffer = null;
                OutputStream out = null;
                File outFile = null;
                try {
                    ByteBuffer base;
                    ByteArrayOutputStream baos = null;
                    switch (this.streamId) {
                        case "largeBuffer": {
                            baos = new ByteArrayOutputStream();
                            out = baos;
                            srcBuffer = largeBuffer;
                            break;
                        }
                        case "smallBuffer": {
                            baos = new ByteArrayOutputStream();
                            out = baos;
                            srcBuffer = smallBuffer;
                            break;
                        }
                        case "file": {
                            outFile = File.createTempFile("data", ".tmp", tempDir);
                            out = new FileOutputStream(outFile);
                            break;
                        }
                        case "emptyBuffer": {
                            baos = new ByteArrayOutputStream();
                            out = baos;
                            srcBuffer = emptyBuffer;
                            break;
                        }
                        default: {
                            throw new IllegalArgumentException(this.streamId);
                        }
                    }
                    TestCallback callback = new TestCallback(out);
                    this.client.stream(this.streamId, (StreamCallback)callback);
                    this.waitForCompletion(callback);
                    if (srcBuffer == null) {
                        Assert.assertTrue((String)"File stream did not match.", (boolean)Files.equal((File)testFile, (File)outFile));
                        break block29;
                    }
                    ByteBuffer byteBuffer = srcBuffer;
                    synchronized (byteBuffer) {
                        base = srcBuffer.duplicate();
                    }
                    byte[] result = baos.toByteArray();
                    byte[] expected = new byte[base.remaining()];
                    base.get(expected);
                    Assert.assertEquals((long)expected.length, (long)result.length);
                    Assert.assertTrue((String)"buffers don't match", (boolean)Arrays.equals(expected, result));
                }
                catch (Throwable t) {
                    this.error = t;
                }
                finally {
                    if (out != null) {
                        try {
                            out.close();
                        }
                        catch (Exception e) {}
                    }
                    if (outFile != null) {
                        outFile.delete();
                    }
                }
            }
        }

        public void check() throws Throwable {
            if (this.error != null) {
                throw this.error;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void waitForCompletion(TestCallback callback) throws Exception {
            long now = System.currentTimeMillis();
            long deadline = now + this.timeoutMs;
            TestCallback testCallback = callback;
            synchronized (testCallback) {
                while (!callback.completed && now < deadline) {
                    callback.wait(deadline - now);
                    now = System.currentTimeMillis();
                }
            }
            Assert.assertTrue((String)"Timed out waiting for stream.", (boolean)callback.completed);
            Assert.assertNull((Object)callback.error);
        }
    }
}

