/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network;

import com.google.common.base.Charsets;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.spark.network.TestUtils;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.util.ConfigProvider;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class RpcIntegrationSuite {
    static TransportServer server;
    static TransportClientFactory clientFactory;
    static RpcHandler rpcHandler;

    @BeforeClass
    public static void setUp() throws Exception {
        TransportConf conf = new TransportConf((ConfigProvider)new SystemPropertyConfigProvider());
        rpcHandler = new RpcHandler(){

            public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) {
                String msg = new String(message, Charsets.UTF_8);
                String[] parts = msg.split("/");
                if (parts[0].equals("hello")) {
                    callback.onSuccess(("Hello, " + parts[1] + "!").getBytes(Charsets.UTF_8));
                } else if (parts[0].equals("return error")) {
                    callback.onFailure((Throwable)new RuntimeException("Returned: " + parts[1]));
                } else if (parts[0].equals("throw error")) {
                    throw new RuntimeException("Thrown: " + parts[1]);
                }
            }

            public StreamManager getStreamManager() {
                return new OneForOneStreamManager();
            }
        };
        TransportContext context = new TransportContext(conf, rpcHandler);
        server = context.createServer();
        clientFactory = context.createClientFactory();
    }

    @AfterClass
    public static void tearDown() {
        server.close();
        clientFactory.close();
    }

    private RpcResult sendRPC(String ... commands) throws Exception {
        TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
        final Semaphore sem = new Semaphore(0);
        final RpcResult res = new RpcResult();
        res.successMessages = Collections.synchronizedSet(new HashSet());
        res.errorMessages = Collections.synchronizedSet(new HashSet());
        RpcResponseCallback callback = new RpcResponseCallback(){

            public void onSuccess(byte[] message) {
                res.successMessages.add(new String(message, Charsets.UTF_8));
                sem.release();
            }

            public void onFailure(Throwable e) {
                res.errorMessages.add(e.getMessage());
                sem.release();
            }
        };
        for (String command : commands) {
            client.sendRpc(command.getBytes(Charsets.UTF_8), callback);
        }
        if (!sem.tryAcquire(commands.length, 5L, TimeUnit.SECONDS)) {
            Assert.fail((String)"Timeout getting response from the server");
        }
        client.close();
        return res;
    }

    @Test
    public void singleRPC() throws Exception {
        RpcResult res = this.sendRPC("hello/Aaron");
        Assert.assertEquals(res.successMessages, (Object)Sets.newHashSet((Object[])new String[]{"Hello, Aaron!"}));
        Assert.assertTrue((boolean)res.errorMessages.isEmpty());
    }

    @Test
    public void doubleRPC() throws Exception {
        RpcResult res = this.sendRPC("hello/Aaron", "hello/Reynold");
        Assert.assertEquals(res.successMessages, (Object)Sets.newHashSet((Object[])new String[]{"Hello, Aaron!", "Hello, Reynold!"}));
        Assert.assertTrue((boolean)res.errorMessages.isEmpty());
    }

    @Test
    public void returnErrorRPC() throws Exception {
        RpcResult res = this.sendRPC("return error/OK");
        Assert.assertTrue((boolean)res.successMessages.isEmpty());
        this.assertErrorsContain(res.errorMessages, Sets.newHashSet((Object[])new String[]{"Returned: OK"}));
    }

    @Test
    public void throwErrorRPC() throws Exception {
        RpcResult res = this.sendRPC("throw error/uh-oh");
        Assert.assertTrue((boolean)res.successMessages.isEmpty());
        this.assertErrorsContain(res.errorMessages, Sets.newHashSet((Object[])new String[]{"Thrown: uh-oh"}));
    }

    @Test
    public void doubleTrouble() throws Exception {
        RpcResult res = this.sendRPC("return error/OK", "throw error/uh-oh");
        Assert.assertTrue((boolean)res.successMessages.isEmpty());
        this.assertErrorsContain(res.errorMessages, Sets.newHashSet((Object[])new String[]{"Returned: OK", "Thrown: uh-oh"}));
    }

    @Test
    public void sendSuccessAndFailure() throws Exception {
        RpcResult res = this.sendRPC("hello/Bob", "throw error/the", "hello/Builder", "return error/!");
        Assert.assertEquals(res.successMessages, (Object)Sets.newHashSet((Object[])new String[]{"Hello, Bob!", "Hello, Builder!"}));
        this.assertErrorsContain(res.errorMessages, Sets.newHashSet((Object[])new String[]{"Thrown: the", "Returned: !"}));
    }

    private void assertErrorsContain(Set<String> errors, Set<String> contains) {
        Assert.assertEquals((long)contains.size(), (long)errors.size());
        HashSet remainingErrors = Sets.newHashSet(errors);
        for (String contain : contains) {
            Iterator it = remainingErrors.iterator();
            boolean foundMatch = false;
            while (it.hasNext()) {
                if (!((String)it.next()).contains(contain)) continue;
                it.remove();
                foundMatch = true;
                break;
            }
            Assert.assertTrue((String)("Could not find error containing " + contain + "; errors: " + errors), (boolean)foundMatch);
        }
        Assert.assertTrue((boolean)remainingErrors.isEmpty());
    }

    class RpcResult {
        public Set<String> successMessages;
        public Set<String> errorMessages;

        RpcResult() {
        }
    }
}

