/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network.shuffle;

import com.google.common.collect.Maps;
import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NettyManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.ChunkReceivedCallback;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.OneForOneBlockFetcher;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.OpenBlocks;
import org.apache.spark.network.shuffle.protocol.StreamHandle;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class OneForOneBlockFetcherSuite {
    @Test
    public void testFetchOne() {
        LinkedHashMap blocks = Maps.newLinkedHashMap();
        blocks.put("shuffle_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
        BlockFetchingListener listener = this.fetchBlocks(blocks);
        ((BlockFetchingListener)Mockito.verify((Object)listener)).onBlockFetchSuccess("shuffle_0_0_0", (ManagedBuffer)blocks.get("shuffle_0_0_0"));
    }

    @Test
    public void testFetchThree() {
        LinkedHashMap blocks = Maps.newLinkedHashMap();
        blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
        blocks.put("b1", new NioManagedBuffer(ByteBuffer.wrap(new byte[23])));
        blocks.put("b2", new NettyManagedBuffer(Unpooled.wrappedBuffer((byte[])new byte[23])));
        BlockFetchingListener listener = this.fetchBlocks(blocks);
        for (int i = 0; i < 3; ++i) {
            ((BlockFetchingListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onBlockFetchSuccess("b" + i, (ManagedBuffer)blocks.get("b" + i));
        }
    }

    @Test
    public void testFailure() {
        LinkedHashMap blocks = Maps.newLinkedHashMap();
        blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
        blocks.put("b1", null);
        blocks.put("b2", null);
        BlockFetchingListener listener = this.fetchBlocks(blocks);
        ((BlockFetchingListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onBlockFetchSuccess("b0", (ManagedBuffer)blocks.get("b0"));
        ((BlockFetchingListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onBlockFetchFailure((String)Matchers.eq((Object)"b1"), (Throwable)Matchers.any());
        ((BlockFetchingListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)2))).onBlockFetchFailure((String)Matchers.eq((Object)"b2"), (Throwable)Matchers.any());
    }

    @Test
    public void testFailureAndSuccess() {
        LinkedHashMap blocks = Maps.newLinkedHashMap();
        blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
        blocks.put("b1", null);
        blocks.put("b2", new NioManagedBuffer(ByteBuffer.wrap(new byte[21])));
        BlockFetchingListener listener = this.fetchBlocks(blocks);
        ((BlockFetchingListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onBlockFetchSuccess("b0", (ManagedBuffer)blocks.get("b0"));
        ((BlockFetchingListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onBlockFetchFailure((String)Matchers.eq((Object)"b1"), (Throwable)Matchers.any());
        ((BlockFetchingListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onBlockFetchSuccess("b2", (ManagedBuffer)blocks.get("b2"));
        ((BlockFetchingListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onBlockFetchFailure((String)Matchers.eq((Object)"b2"), (Throwable)Matchers.any());
    }

    @Test
    public void testEmptyBlockFetch() {
        try {
            this.fetchBlocks(Maps.newLinkedHashMap());
            Assert.fail();
        }
        catch (IllegalArgumentException e) {
            Assert.assertEquals((Object)"Zero-sized blockIds array", (Object)e.getMessage());
        }
    }

    private BlockFetchingListener fetchBlocks(final LinkedHashMap<String, ManagedBuffer> blocks) {
        TransportClient client = (TransportClient)Mockito.mock(TransportClient.class);
        BlockFetchingListener listener = (BlockFetchingListener)Mockito.mock(BlockFetchingListener.class);
        final String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
        OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client, "app-id", "exec-id", blockIds, listener);
        ((TransportClient)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
                BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer((ByteBuffer)((ByteBuffer)invocationOnMock.getArguments()[0]));
                RpcResponseCallback callback = (RpcResponseCallback)invocationOnMock.getArguments()[1];
                callback.onSuccess(new StreamHandle(123L, blocks.size()).toByteBuffer());
                Assert.assertEquals((Object)new OpenBlocks("app-id", "exec-id", blockIds), (Object)message);
                return null;
            }
        }).when((Object)client)).sendRpc((ByteBuffer)Matchers.any(ByteBuffer.class), (RpcResponseCallback)Matchers.any(RpcResponseCallback.class));
        final AtomicInteger expectedChunkIndex = new AtomicInteger(0);
        final Iterator<ManagedBuffer> blockIterator = blocks.values().iterator();
        ((TransportClient)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                try {
                    long streamId = (Long)invocation.getArguments()[0];
                    int myChunkIndex = (Integer)invocation.getArguments()[1];
                    Assert.assertEquals((long)123L, (long)streamId);
                    Assert.assertEquals((long)expectedChunkIndex.getAndIncrement(), (long)myChunkIndex);
                    ChunkReceivedCallback callback = (ChunkReceivedCallback)invocation.getArguments()[2];
                    ManagedBuffer result = (ManagedBuffer)blockIterator.next();
                    if (result != null) {
                        callback.onSuccess(myChunkIndex, result);
                    } else {
                        callback.onFailure(myChunkIndex, (Throwable)new RuntimeException("Failed " + myChunkIndex));
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                    Assert.fail((String)"Unexpected failure");
                }
                return null;
            }
        }).when((Object)client)).fetchChunk(Matchers.anyLong(), Matchers.anyInt(), (ChunkReceivedCallback)Matchers.any());
        fetcher.start();
        return listener;
    }
}

