/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network.shuffle;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.ChunkReceivedCallback;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.StreamCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.TempShuffleFileManager;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.OpenBlocks;
import org.apache.spark.network.shuffle.protocol.StreamHandle;
import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OneForOneBlockFetcher {
    private static final Logger logger = LoggerFactory.getLogger(OneForOneBlockFetcher.class);
    private final TransportClient client;
    private final OpenBlocks openMessage;
    private final String[] blockIds;
    private final BlockFetchingListener listener;
    private final ChunkReceivedCallback chunkCallback;
    private final TransportConf transportConf;
    private final TempShuffleFileManager tempShuffleFileManager;
    private StreamHandle streamHandle = null;

    public OneForOneBlockFetcher(TransportClient client, String appId, String execId, String[] blockIds, BlockFetchingListener listener, TransportConf transportConf) {
        this(client, appId, execId, blockIds, listener, transportConf, null);
    }

    public OneForOneBlockFetcher(TransportClient client, String appId, String execId, String[] blockIds, BlockFetchingListener listener, TransportConf transportConf, TempShuffleFileManager tempShuffleFileManager) {
        this.client = client;
        this.openMessage = new OpenBlocks(appId, execId, blockIds);
        this.blockIds = blockIds;
        this.listener = listener;
        this.chunkCallback = new ChunkCallback();
        this.transportConf = transportConf;
        this.tempShuffleFileManager = tempShuffleFileManager;
    }

    public void start() {
        if (this.blockIds.length == 0) {
            throw new IllegalArgumentException("Zero-sized blockIds array");
        }
        this.client.sendRpc(this.openMessage.toByteBuffer(), new RpcResponseCallback(){

            public void onSuccess(ByteBuffer response) {
                try {
                    OneForOneBlockFetcher.this.streamHandle = (StreamHandle)BlockTransferMessage.Decoder.fromByteBuffer(response);
                    logger.trace("Successfully opened blocks {}, preparing to fetch chunks.", (Object)OneForOneBlockFetcher.this.streamHandle);
                    for (int i = 0; i < ((OneForOneBlockFetcher)OneForOneBlockFetcher.this).streamHandle.numChunks; ++i) {
                        if (OneForOneBlockFetcher.this.tempShuffleFileManager != null) {
                            OneForOneBlockFetcher.this.client.stream(OneForOneStreamManager.genStreamChunkId((long)((OneForOneBlockFetcher)OneForOneBlockFetcher.this).streamHandle.streamId, (int)i), (StreamCallback)new DownloadCallback(i));
                            continue;
                        }
                        OneForOneBlockFetcher.this.client.fetchChunk(((OneForOneBlockFetcher)OneForOneBlockFetcher.this).streamHandle.streamId, i, OneForOneBlockFetcher.this.chunkCallback);
                    }
                }
                catch (Exception e) {
                    logger.error("Failed while starting block fetches after success", (Throwable)e);
                    OneForOneBlockFetcher.this.failRemainingBlocks(OneForOneBlockFetcher.this.blockIds, e);
                }
            }

            public void onFailure(Throwable e) {
                logger.error("Failed while starting block fetches", e);
                OneForOneBlockFetcher.this.failRemainingBlocks(OneForOneBlockFetcher.this.blockIds, e);
            }
        });
    }

    private void failRemainingBlocks(String[] failedBlockIds, Throwable e) {
        for (String blockId : failedBlockIds) {
            try {
                this.listener.onBlockFetchFailure(blockId, e);
            }
            catch (Exception e2) {
                logger.error("Error in block fetch failure callback", (Throwable)e2);
            }
        }
    }

    private class DownloadCallback
    implements StreamCallback {
        private WritableByteChannel channel = null;
        private File targetFile = null;
        private int chunkIndex;

        DownloadCallback(int chunkIndex) throws IOException {
            this.targetFile = OneForOneBlockFetcher.this.tempShuffleFileManager.createTempShuffleFile();
            this.channel = Channels.newChannel(new FileOutputStream(this.targetFile));
            this.chunkIndex = chunkIndex;
        }

        public void onData(String streamId, ByteBuffer buf) throws IOException {
            this.channel.write(buf);
        }

        public void onComplete(String streamId) throws IOException {
            this.channel.close();
            FileSegmentManagedBuffer buffer = new FileSegmentManagedBuffer(OneForOneBlockFetcher.this.transportConf, this.targetFile, 0L, this.targetFile.length());
            OneForOneBlockFetcher.this.listener.onBlockFetchSuccess(OneForOneBlockFetcher.this.blockIds[this.chunkIndex], (ManagedBuffer)buffer);
            if (!OneForOneBlockFetcher.this.tempShuffleFileManager.registerTempShuffleFileToClean(this.targetFile)) {
                this.targetFile.delete();
            }
        }

        public void onFailure(String streamId, Throwable cause) throws IOException {
            this.channel.close();
            String[] remainingBlockIds = Arrays.copyOfRange(OneForOneBlockFetcher.this.blockIds, this.chunkIndex, OneForOneBlockFetcher.this.blockIds.length);
            OneForOneBlockFetcher.this.failRemainingBlocks(remainingBlockIds, cause);
            this.targetFile.delete();
        }
    }

    private class ChunkCallback
    implements ChunkReceivedCallback {
        private ChunkCallback() {
        }

        public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
            OneForOneBlockFetcher.this.listener.onBlockFetchSuccess(OneForOneBlockFetcher.this.blockIds[chunkIndex], buffer);
        }

        public void onFailure(int chunkIndex, Throwable e) {
            String[] remainingBlockIds = Arrays.copyOfRange(OneForOneBlockFetcher.this.blockIds, chunkIndex, OneForOneBlockFetcher.this.blockIds.length);
            OneForOneBlockFetcher.this.failRemainingBlocks(remainingBlockIds, e);
        }
    }
}

