/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network.server;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.SocketAddress;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.protocol.ChunkFetchFailure;
import org.apache.spark.network.protocol.ChunkFetchRequest;
import org.apache.spark.network.protocol.ChunkFetchSuccess;
import org.apache.spark.network.protocol.Encodable;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.util.NettyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sparkproject.guava.base.Throwables;

public class ChunkFetchRequestHandler
extends SimpleChannelInboundHandler<ChunkFetchRequest> {
    private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class);
    private final TransportClient client;
    private final StreamManager streamManager;
    private final long maxChunksBeingTransferred;

    public ChunkFetchRequestHandler(TransportClient client, StreamManager streamManager, Long maxChunksBeingTransferred) {
        this.client = client;
        this.streamManager = streamManager;
        this.maxChunksBeingTransferred = maxChunksBeingTransferred;
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.warn("Exception in connection from " + NettyUtils.getRemoteAddress(ctx.channel()), cause);
        ctx.close();
    }

    protected void channelRead0(ChannelHandlerContext ctx, ChunkFetchRequest msg) throws Exception {
        ManagedBuffer buf;
        long chunksBeingTransferred;
        Channel channel = ctx.channel();
        if (logger.isTraceEnabled()) {
            logger.trace("Received req from {} to fetch block {}", (Object)NettyUtils.getRemoteAddress(channel), (Object)msg.streamChunkId);
        }
        if ((chunksBeingTransferred = this.streamManager.chunksBeingTransferred()) >= this.maxChunksBeingTransferred) {
            logger.warn("The number of chunks being transferred {} is above {}, close the connection.", (Object)chunksBeingTransferred, (Object)this.maxChunksBeingTransferred);
            channel.close();
            return;
        }
        try {
            this.streamManager.checkAuthorization(this.client, msg.streamChunkId.streamId);
            buf = this.streamManager.getChunk(msg.streamChunkId.streamId, msg.streamChunkId.chunkIndex);
            if (buf == null) {
                throw new IllegalStateException("Chunk was not found");
            }
        }
        catch (Exception e) {
            logger.error(String.format("Error opening block %s for request from %s", msg.streamChunkId, NettyUtils.getRemoteAddress(channel)), (Throwable)e);
            this.respond(channel, new ChunkFetchFailure(msg.streamChunkId, Throwables.getStackTraceAsString(e)));
            return;
        }
        this.streamManager.chunkBeingSent(msg.streamChunkId.streamId);
        this.respond(channel, new ChunkFetchSuccess(msg.streamChunkId, buf)).addListener((GenericFutureListener)((ChannelFutureListener)future -> this.streamManager.chunkSent(msg.streamChunkId.streamId)));
    }

    private ChannelFuture respond(Channel channel, Encodable result) throws InterruptedException {
        SocketAddress remoteAddress = channel.remoteAddress();
        return channel.writeAndFlush((Object)result).await().addListener((GenericFutureListener)((ChannelFutureListener)future -> {
            if (future.isSuccess()) {
                logger.trace("Sent result {} to client {}", (Object)result, (Object)remoteAddress);
            } else {
                logger.error(String.format("Error sending result %s to %s; closing connection", result, remoteAddress), future.cause());
                channel.close();
            }
        }));
    }
}

