package org.apache.spark.network.shuffle;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.ChunkReceivedCallback;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.StreamCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.shuffle.protocol.AbstractFetchShuffleBlocks;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.FetchShuffleBlockChunks;
import org.apache.spark.network.shuffle.protocol.FetchShuffleBlocks;
import org.apache.spark.network.shuffle.protocol.OpenBlocks;
import org.apache.spark.network.shuffle.protocol.StreamHandle;
import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sparkproject.guava.primitives.Ints;
import org.sparkproject.guava.primitives.Longs;

/* loaded from: input_file:org/apache/spark/network/shuffle/OneForOneBlockFetcher.class */
public class OneForOneBlockFetcher {
    private static final Logger logger;
    private static final String SHUFFLE_BLOCK_PREFIX = "shuffle_";
    private static final String SHUFFLE_CHUNK_PREFIX = "shuffleChunk_";
    private static final String SHUFFLE_BLOCK_SPLIT = "shuffle";
    private static final String SHUFFLE_CHUNK_SPLIT = "shuffleChunk";
    private final TransportClient client;
    private final BlockTransferMessage message;
    private final String[] blockIds;
    private final BlockFetchingListener listener;
    private final ChunkReceivedCallback chunkCallback;
    private final TransportConf transportConf;
    private final DownloadFileManager downloadFileManager;
    private StreamHandle streamHandle;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/spark/network/shuffle/OneForOneBlockFetcher$BlocksInfo.class */
    public static class BlocksInfo {
        final ArrayList<Integer> ids = new ArrayList<>();
        final ArrayList<String> blockIds = new ArrayList<>();

        BlocksInfo() {
        }
    }

    /* loaded from: input_file:org/apache/spark/network/shuffle/OneForOneBlockFetcher$ChunkCallback.class */
    private class ChunkCallback implements ChunkReceivedCallback {
        private ChunkCallback() {
        }

        public void onSuccess(int i, ManagedBuffer managedBuffer) {
            OneForOneBlockFetcher.this.listener.onBlockFetchSuccess(OneForOneBlockFetcher.this.blockIds[i], managedBuffer);
        }

        public void onFailure(int i, Throwable th) {
            OneForOneBlockFetcher.this.failRemainingBlocks((String[]) Arrays.copyOfRange(OneForOneBlockFetcher.this.blockIds, i, OneForOneBlockFetcher.this.blockIds.length), th);
        }
    }

    /* loaded from: input_file:org/apache/spark/network/shuffle/OneForOneBlockFetcher$DownloadCallback.class */
    private class DownloadCallback implements StreamCallback {
        private DownloadFileWritableChannel channel;
        private DownloadFile targetFile;
        private int chunkIndex;

        DownloadCallback(int i) throws IOException {
            this.channel = null;
            this.targetFile = null;
            this.targetFile = OneForOneBlockFetcher.this.downloadFileManager.createTempFile(OneForOneBlockFetcher.this.transportConf);
            this.channel = this.targetFile.openForWriting();
            this.chunkIndex = i;
        }

        public void onData(String str, ByteBuffer byteBuffer) throws IOException {
            while (byteBuffer.hasRemaining()) {
                this.channel.write(byteBuffer);
            }
        }

        public void onComplete(String str) throws IOException {
            OneForOneBlockFetcher.this.listener.onBlockFetchSuccess(OneForOneBlockFetcher.this.blockIds[this.chunkIndex], this.channel.closeAndRead());
            if (OneForOneBlockFetcher.this.downloadFileManager.registerTempFileToClean(this.targetFile)) {
                return;
            }
            this.targetFile.delete();
        }

        public void onFailure(String str, Throwable th) throws IOException {
            this.channel.close();
            OneForOneBlockFetcher.this.failRemainingBlocks((String[]) Arrays.copyOfRange(OneForOneBlockFetcher.this.blockIds, this.chunkIndex, OneForOneBlockFetcher.this.blockIds.length), th);
            this.targetFile.delete();
        }
    }

    public OneForOneBlockFetcher(TransportClient transportClient, String str, String str2, String[] strArr, BlockFetchingListener blockFetchingListener, TransportConf transportConf) {
        this(transportClient, str, str2, strArr, blockFetchingListener, transportConf, null);
    }

    public OneForOneBlockFetcher(TransportClient transportClient, String str, String str2, String[] strArr, BlockFetchingListener blockFetchingListener, TransportConf transportConf, DownloadFileManager downloadFileManager) {
        this.streamHandle = null;
        this.client = transportClient;
        this.listener = blockFetchingListener;
        this.chunkCallback = new ChunkCallback();
        this.transportConf = transportConf;
        this.downloadFileManager = downloadFileManager;
        if (strArr.length == 0) {
            throw new IllegalArgumentException("Zero-sized blockIds array");
        }
        if (transportConf.useOldFetchProtocol() || !areShuffleBlocksOrChunks(strArr)) {
            this.blockIds = strArr;
            this.message = new OpenBlocks(str, str2, strArr);
        } else {
            this.blockIds = new String[strArr.length];
            this.message = createFetchShuffleBlocksOrChunksMsg(str, str2, strArr);
        }
    }

    private boolean areShuffleBlocksOrChunks(String[] strArr) {
        if (isAnyBlockNotStartWithShuffleBlockPrefix(strArr)) {
            return isAllBlocksStartWithShuffleChunkPrefix(strArr);
        }
        return true;
    }

    private static boolean isAnyBlockNotStartWithShuffleBlockPrefix(String[] strArr) {
        for (String str : strArr) {
            if (!str.startsWith(SHUFFLE_BLOCK_PREFIX)) {
                return true;
            }
        }
        return false;
    }

    private static boolean isAllBlocksStartWithShuffleChunkPrefix(String[] strArr) {
        for (String str : strArr) {
            if (!str.startsWith(SHUFFLE_CHUNK_PREFIX)) {
                return false;
            }
        }
        return true;
    }

    private AbstractFetchShuffleBlocks createFetchShuffleBlocksOrChunksMsg(String str, String str2, String[] strArr) {
        return strArr[0].startsWith(SHUFFLE_CHUNK_PREFIX) ? createFetchShuffleChunksMsg(str, str2, strArr) : createFetchShuffleBlocksMsg(str, str2, strArr);
    }

    private AbstractFetchShuffleBlocks createFetchShuffleBlocksMsg(String str, String str2, String[] strArr) {
        String[] splitBlockId = splitBlockId(strArr[0]);
        int parseInt = Integer.parseInt(splitBlockId[1]);
        boolean z = splitBlockId.length == 5;
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (String str3 : strArr) {
            String[] splitBlockId2 = splitBlockId(str3);
            if (Integer.parseInt(splitBlockId2[1]) != parseInt) {
                throw new IllegalArgumentException("Expected shuffleId=" + parseInt + ", got:" + str3);
            }
            BlocksInfo computeIfAbsent = linkedHashMap.computeIfAbsent(Long.valueOf(Long.parseLong(splitBlockId2[2])), l -> {
                return new BlocksInfo();
            });
            computeIfAbsent.blockIds.add(str3);
            computeIfAbsent.ids.add(Integer.valueOf(Integer.parseInt(splitBlockId2[3])));
            if (z) {
                if (!$assertionsDisabled && splitBlockId2.length != 5) {
                    throw new AssertionError();
                }
                computeIfAbsent.ids.add(Integer.valueOf(Integer.parseInt(splitBlockId2[4])));
            }
        }
        return new FetchShuffleBlocks(str, str2, parseInt, Longs.toArray(linkedHashMap.keySet()), getSecondaryIds(linkedHashMap), z);
    }

    private AbstractFetchShuffleBlocks createFetchShuffleChunksMsg(String str, String str2, String[] strArr) {
        String[] splitBlockId = splitBlockId(strArr[0]);
        int parseInt = Integer.parseInt(splitBlockId[1]);
        int parseInt2 = Integer.parseInt(splitBlockId[2]);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (String str3 : strArr) {
            String[] splitBlockId2 = splitBlockId(str3);
            if (Integer.parseInt(splitBlockId2[1]) != parseInt || Integer.parseInt(splitBlockId2[2]) != parseInt2) {
                throw new IllegalArgumentException(String.format("Expected shuffleId = %s and shuffleMergeId = %s but got %s", Integer.valueOf(parseInt), Integer.valueOf(parseInt2), str3));
            }
            BlocksInfo computeIfAbsent = linkedHashMap.computeIfAbsent(Integer.valueOf(Integer.parseInt(splitBlockId2[3])), num -> {
                return new BlocksInfo();
            });
            computeIfAbsent.blockIds.add(str3);
            computeIfAbsent.ids.add(Integer.valueOf(Integer.parseInt(splitBlockId2[4])));
        }
        return new FetchShuffleBlockChunks(str, str2, parseInt, parseInt2, Ints.toArray(linkedHashMap.keySet()), getSecondaryIds(linkedHashMap));
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [int[], int[][]] */
    private int[][] getSecondaryIds(Map<? extends Number, BlocksInfo> map) {
        ?? r0 = new int[map.size()];
        int i = 0;
        int i2 = 0;
        for (BlocksInfo blocksInfo : map.values()) {
            int i3 = i2;
            i2++;
            r0[i3] = Ints.toArray(blocksInfo.ids);
            Iterator<String> it = blocksInfo.blockIds.iterator();
            while (it.hasNext()) {
                int i4 = i;
                i++;
                this.blockIds[i4] = it.next();
            }
        }
        if ($assertionsDisabled || i == this.blockIds.length) {
            return r0;
        }
        throw new AssertionError();
    }

    private String[] splitBlockId(String str) {
        String[] split = str.split("_");
        if (split.length < 4 || split.length > 5) {
            throw new IllegalArgumentException("Unexpected shuffle block id format: " + str);
        }
        if (split.length == 4 && !split[0].equals(SHUFFLE_BLOCK_SPLIT)) {
            throw new IllegalArgumentException("Unexpected shuffle block id format: " + str);
        }
        if (split.length != 5 || split[0].equals(SHUFFLE_BLOCK_SPLIT) || split[0].equals(SHUFFLE_CHUNK_SPLIT)) {
            return split;
        }
        throw new IllegalArgumentException("Unexpected shuffle block id format: " + str);
    }

    public void start() {
        this.client.sendRpc(this.message.toByteBuffer(), new RpcResponseCallback() { // from class: org.apache.spark.network.shuffle.OneForOneBlockFetcher.1
            public void onSuccess(ByteBuffer byteBuffer) {
                try {
                    OneForOneBlockFetcher.this.streamHandle = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(byteBuffer);
                    OneForOneBlockFetcher.logger.trace("Successfully opened blocks {}, preparing to fetch chunks.", OneForOneBlockFetcher.this.streamHandle);
                    for (int i = 0; i < OneForOneBlockFetcher.this.streamHandle.numChunks; i++) {
                        if (OneForOneBlockFetcher.this.downloadFileManager != null) {
                            OneForOneBlockFetcher.this.client.stream(OneForOneStreamManager.genStreamChunkId(OneForOneBlockFetcher.this.streamHandle.streamId, i), new DownloadCallback(i));
                        } else {
                            OneForOneBlockFetcher.this.client.fetchChunk(OneForOneBlockFetcher.this.streamHandle.streamId, i, OneForOneBlockFetcher.this.chunkCallback);
                        }
                    }
                } catch (Exception e) {
                    OneForOneBlockFetcher.logger.error("Failed while starting block fetches after success", e);
                    OneForOneBlockFetcher.this.failRemainingBlocks(OneForOneBlockFetcher.this.blockIds, e);
                }
            }

            public void onFailure(Throwable th) {
                OneForOneBlockFetcher.logger.error("Failed while starting block fetches", th);
                OneForOneBlockFetcher.this.failRemainingBlocks(OneForOneBlockFetcher.this.blockIds, th);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void failRemainingBlocks(String[] strArr, Throwable th) {
        for (String str : strArr) {
            try {
                this.listener.onBlockFetchFailure(str, th);
            } catch (Exception e) {
                logger.error("Error in block fetch failure callback", e);
            }
        }
    }

    static {
        $assertionsDisabled = !OneForOneBlockFetcher.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(OneForOneBlockFetcher.class);
    }
}
