/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network;

import com.google.common.primitives.Ints;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.nio.channels.WritableByteChannel;
import java.util.List;
import org.apache.spark.network.TestManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.protocol.ChunkFetchFailure;
import org.apache.spark.network.protocol.ChunkFetchRequest;
import org.apache.spark.network.protocol.ChunkFetchSuccess;
import org.apache.spark.network.protocol.Message;
import org.apache.spark.network.protocol.MessageDecoder;
import org.apache.spark.network.protocol.MessageEncoder;
import org.apache.spark.network.protocol.OneWayMessage;
import org.apache.spark.network.protocol.RpcFailure;
import org.apache.spark.network.protocol.RpcRequest;
import org.apache.spark.network.protocol.RpcResponse;
import org.apache.spark.network.protocol.StreamChunkId;
import org.apache.spark.network.protocol.StreamFailure;
import org.apache.spark.network.protocol.StreamRequest;
import org.apache.spark.network.protocol.StreamResponse;
import org.apache.spark.network.util.ByteArrayWritableChannel;
import org.apache.spark.network.util.NettyUtils;
import org.junit.Assert;
import org.junit.Test;

public class ProtocolSuite {
    private void testServerToClient(Message msg) {
        EmbeddedChannel serverChannel = new EmbeddedChannel(new ChannelHandler[]{new FileRegionEncoder(), MessageEncoder.INSTANCE});
        serverChannel.writeOutbound(new Object[]{msg});
        EmbeddedChannel clientChannel = new EmbeddedChannel(new ChannelHandler[]{NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE});
        while (!serverChannel.outboundMessages().isEmpty()) {
            clientChannel.writeInbound(new Object[]{serverChannel.readOutbound()});
        }
        Assert.assertEquals((long)1L, (long)clientChannel.inboundMessages().size());
        Assert.assertEquals((Object)msg, (Object)clientChannel.readInbound());
    }

    private void testClientToServer(Message msg) {
        EmbeddedChannel clientChannel = new EmbeddedChannel(new ChannelHandler[]{new FileRegionEncoder(), MessageEncoder.INSTANCE});
        clientChannel.writeOutbound(new Object[]{msg});
        EmbeddedChannel serverChannel = new EmbeddedChannel(new ChannelHandler[]{NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE});
        while (!clientChannel.outboundMessages().isEmpty()) {
            serverChannel.writeInbound(new Object[]{clientChannel.readOutbound()});
        }
        Assert.assertEquals((long)1L, (long)serverChannel.inboundMessages().size());
        Assert.assertEquals((Object)msg, (Object)serverChannel.readInbound());
    }

    @Test
    public void requests() {
        this.testClientToServer((Message)new ChunkFetchRequest(new StreamChunkId(1L, 2)));
        this.testClientToServer((Message)new RpcRequest(12345L, (ManagedBuffer)new TestManagedBuffer(0)));
        this.testClientToServer((Message)new RpcRequest(12345L, (ManagedBuffer)new TestManagedBuffer(10)));
        this.testClientToServer((Message)new StreamRequest("abcde"));
        this.testClientToServer((Message)new OneWayMessage((ManagedBuffer)new TestManagedBuffer(10)));
    }

    @Test
    public void responses() {
        this.testServerToClient((Message)new ChunkFetchSuccess(new StreamChunkId(1L, 2), (ManagedBuffer)new TestManagedBuffer(10)));
        this.testServerToClient((Message)new ChunkFetchSuccess(new StreamChunkId(1L, 2), (ManagedBuffer)new TestManagedBuffer(0)));
        this.testServerToClient((Message)new ChunkFetchFailure(new StreamChunkId(1L, 2), "this is an error"));
        this.testServerToClient((Message)new ChunkFetchFailure(new StreamChunkId(1L, 2), ""));
        this.testServerToClient((Message)new RpcResponse(12345L, (ManagedBuffer)new TestManagedBuffer(0)));
        this.testServerToClient((Message)new RpcResponse(12345L, (ManagedBuffer)new TestManagedBuffer(100)));
        this.testServerToClient((Message)new RpcFailure(0L, "this is an error"));
        this.testServerToClient((Message)new RpcFailure(0L, ""));
        this.testServerToClient((Message)new StreamResponse("anId", 12345L, (ManagedBuffer)new TestManagedBuffer(0)));
        this.testServerToClient((Message)new StreamFailure("anId", "this is an error"));
    }

    private static class FileRegionEncoder
    extends MessageToMessageEncoder<FileRegion> {
        private FileRegionEncoder() {
        }

        public void encode(ChannelHandlerContext ctx, FileRegion in, List<Object> out) throws Exception {
            ByteArrayWritableChannel channel = new ByteArrayWritableChannel(Ints.checkedCast((long)in.count()));
            while (in.transfered() < in.count()) {
                in.transferTo((WritableByteChannel)channel, in.transfered());
            }
            out.add(Unpooled.wrappedBuffer((byte[])channel.getData()));
        }
    }
}

