package org.apache.tez.runtime.library.shuffle.common;

import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import javax.crypto.SecretKey;
import javax.net.ssl.HttpsURLConnection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleHeader;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;

/* loaded from: input_file:org/apache/tez/runtime/library/shuffle/common/Fetcher.class */
public class Fetcher implements Callable<FetchResult> {
    private static final Log LOG;
    private static final int UNIT_CONNECT_TIMEOUT = 60000;
    private static final AtomicInteger fetcherIdGen;
    private CompressionCodec codec;
    private int connectionTimeout;
    private int readTimeout;
    private boolean ifileReadAhead;
    private int ifileReadAheadLength;
    private final SecretKey shuffleSecret;
    private final FetcherCallback fetcherCallback;
    private final FetchedInputAllocator inputManager;
    private final ApplicationId appId;
    private static boolean sslShuffle;
    private static SSLFactory sslFactory;
    private static boolean sslFactoryInited;
    private final int fetcherIdentifier;
    private List<InputAttemptIdentifier> srcAttempts;
    private String host;
    private int port;
    private int partition;
    private final Map<String, InputAttemptIdentifier> pathToAttemptMap;
    private LinkedHashSet<InputAttemptIdentifier> remaining;
    private URL url;
    private String encHash;
    private String msgToEncode;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/tez/runtime/library/shuffle/common/Fetcher$FetcherBuilder.class */
    public static class FetcherBuilder {
        private Fetcher fetcher;
        private boolean workAssigned = false;

        public FetcherBuilder(FetcherCallback fetcherCallback, FetchedInputAllocator fetchedInputAllocator, ApplicationId applicationId, SecretKey secretKey, Configuration configuration) {
            this.fetcher = new Fetcher(fetcherCallback, fetchedInputAllocator, applicationId, secretKey, configuration);
        }

        public FetcherBuilder setCompressionParameters(CompressionCodec compressionCodec) {
            this.fetcher.codec = compressionCodec;
            return this;
        }

        public FetcherBuilder setConnectionParameters(int i, int i2) {
            this.fetcher.connectionTimeout = i;
            this.fetcher.readTimeout = i2;
            return this;
        }

        public FetcherBuilder setIFileParams(boolean z, int i) {
            this.fetcher.ifileReadAhead = z;
            this.fetcher.ifileReadAheadLength = i;
            return this;
        }

        public FetcherBuilder assignWork(String str, int i, int i2, List<InputAttemptIdentifier> list) {
            this.fetcher.host = str;
            this.fetcher.port = i;
            this.fetcher.partition = i2;
            this.fetcher.srcAttempts = list;
            this.workAssigned = true;
            return this;
        }

        public Fetcher build() {
            Preconditions.checkState(this.workAssigned, "Cannot build a fetcher withot assigning work to it");
            return this.fetcher;
        }
    }

    private Fetcher(FetcherCallback fetcherCallback, FetchedInputAllocator fetchedInputAllocator, ApplicationId applicationId, SecretKey secretKey, Configuration configuration) {
        this.ifileReadAhead = true;
        this.ifileReadAheadLength = 4194304;
        this.fetcherCallback = fetcherCallback;
        this.inputManager = fetchedInputAllocator;
        this.shuffleSecret = secretKey;
        this.appId = applicationId;
        this.pathToAttemptMap = new HashMap();
        this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public FetchResult call() throws Exception {
        InputAttemptIdentifier[] inputAttemptIdentifierArr;
        if (this.srcAttempts.size() == 0) {
            return new FetchResult(this.host, this.port, this.partition, this.srcAttempts);
        }
        for (InputAttemptIdentifier inputAttemptIdentifier : this.srcAttempts) {
            this.pathToAttemptMap.put(inputAttemptIdentifier.getPathComponent(), inputAttemptIdentifier);
        }
        this.remaining = new LinkedHashSet<>(this.srcAttempts);
        try {
            HttpURLConnection connectToShuffleHandler = connectToShuffleHandler(this.host, this.port, this.partition, this.srcAttempts);
            try {
                DataInputStream dataInputStream = new DataInputStream(connectToShuffleHandler.getInputStream());
                validateConnectionResponse(connectToShuffleHandler, this.url, this.msgToEncode, this.encHash);
                InputAttemptIdentifier[] inputAttemptIdentifierArr2 = null;
                while (true) {
                    inputAttemptIdentifierArr = inputAttemptIdentifierArr2;
                    if (this.remaining.isEmpty() || inputAttemptIdentifierArr != null) {
                        break;
                    }
                    inputAttemptIdentifierArr2 = fetchInputs(dataInputStream);
                }
                if (inputAttemptIdentifierArr != null && inputAttemptIdentifierArr.length > 0) {
                    LOG.warn("copyInputs failed for tasks " + Arrays.toString(inputAttemptIdentifierArr));
                    for (InputAttemptIdentifier inputAttemptIdentifier2 : inputAttemptIdentifierArr) {
                        this.fetcherCallback.fetchFailed(this.host, inputAttemptIdentifier2, false);
                    }
                }
                IOUtils.cleanup(LOG, new Closeable[]{dataInputStream});
                if (inputAttemptIdentifierArr != null || this.remaining.isEmpty()) {
                    return new FetchResult(this.host, this.port, this.partition, this.remaining);
                }
                throw new IOException("server didn't return all expected map outputs: " + this.remaining.size() + " left.");
            } catch (IOException e) {
                InputAttemptIdentifier inputAttemptIdentifier3 = this.srcAttempts.get(0);
                LOG.warn("Fetch Failure from host while connecting: " + this.host + ", attempt: " + inputAttemptIdentifier3 + " Informing ShuffleManager: ", e);
                this.fetcherCallback.fetchFailed(this.host, inputAttemptIdentifier3, false);
                return new FetchResult(this.host, this.port, this.partition, this.remaining);
            }
        } catch (IOException e2) {
            Iterator<InputAttemptIdentifier> it = this.remaining.iterator();
            while (it.hasNext()) {
                this.fetcherCallback.fetchFailed(this.host, it.next(), true);
            }
            return new FetchResult(this.host, this.port, this.partition, this.remaining);
        }
    }

    private InputAttemptIdentifier[] fetchInputs(DataInputStream dataInputStream) {
        FetchedInput fetchedInput = null;
        InputAttemptIdentifier inputAttemptIdentifier = null;
        long j = -1;
        long j2 = -1;
        try {
            long currentTimeMillis = System.currentTimeMillis();
            try {
                ShuffleHeader shuffleHeader = new ShuffleHeader();
                shuffleHeader.readFields(dataInputStream);
                String mapId = shuffleHeader.getMapId();
                inputAttemptIdentifier = this.pathToAttemptMap.get(mapId);
                j2 = shuffleHeader.getCompressedLength();
                j = shuffleHeader.getUncompressedLength();
                if (!verifySanity(j2, j, shuffleHeader.getPartition(), inputAttemptIdentifier, mapId)) {
                    if (inputAttemptIdentifier == null) {
                        LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
                        inputAttemptIdentifier = getNextRemainingAttempt();
                    }
                    if ($assertionsDisabled || inputAttemptIdentifier != null) {
                        return new InputAttemptIdentifier[]{inputAttemptIdentifier};
                    }
                    throw new AssertionError();
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("header: " + inputAttemptIdentifier + ", len: " + j2 + ", decomp len: " + j);
                }
                FetchedInput allocate = this.inputManager.allocate(j, j2, inputAttemptIdentifier);
                LOG.info("fetcher about to shuffle output of srcAttempt " + allocate.getInputAttemptIdentifier() + " decomp: " + j + " len: " + j2 + " to " + allocate.getType());
                if (allocate.getType() == FetchedInput.Type.MEMORY) {
                    ShuffleUtils.shuffleToMemory((MemoryFetchedInput) allocate, dataInputStream, (int) j, (int) j2, this.codec, this.ifileReadAhead, this.ifileReadAheadLength, LOG);
                } else {
                    ShuffleUtils.shuffleToDisk((DiskFetchedInput) allocate, dataInputStream, j2, LOG);
                }
                this.fetcherCallback.fetchSucceeded(this.host, inputAttemptIdentifier, allocate, j2, j, System.currentTimeMillis() - currentTimeMillis);
                this.remaining.remove(inputAttemptIdentifier);
                return null;
            } catch (IllegalArgumentException e) {
                LOG.warn("Invalid src id ", e);
                return (InputAttemptIdentifier[]) this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]);
            }
        } catch (IOException e2) {
            if (inputAttemptIdentifier == null || 0 == 0) {
                LOG.info("fetcher failed to read map header" + inputAttemptIdentifier + " decomp: " + j + ", " + j2, e2);
                return inputAttemptIdentifier == null ? (InputAttemptIdentifier[]) this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]) : new InputAttemptIdentifier[]{inputAttemptIdentifier};
            }
            LOG.warn("Failed to shuffle output of " + inputAttemptIdentifier + " from " + this.host, e2);
            try {
                fetchedInput.abort();
            } catch (IOException e3) {
                LOG.info("Failure to cleanup fetchedInput: " + ((Object) null));
            }
            return new InputAttemptIdentifier[]{inputAttemptIdentifier};
        }
    }

    private boolean verifySanity(long j, long j2, int i, InputAttemptIdentifier inputAttemptIdentifier, String str) {
        if (j < 0 || j2 < 0) {
            LOG.warn(" invalid lengths in input header -> headerPathComponent: " + str + ", nextRemainingSrcAttemptId: " + getNextRemainingAttempt() + ", mappedSrcAttemptId: " + inputAttemptIdentifier + " len: " + j + ", decomp len: " + j2);
            return false;
        }
        if (i != this.partition) {
            LOG.warn(" data for the wrong reduce -> headerPathComponent: " + str + "nextRemainingSrcAttemptId: " + getNextRemainingAttempt() + ", mappedSrcAttemptId: " + inputAttemptIdentifier + " len: " + j + " decomp len: " + j2 + " for reduce " + i);
            return false;
        }
        if (this.remaining.contains(inputAttemptIdentifier)) {
            return true;
        }
        LOG.warn("Invalid input. Received output for headerPathComponent: " + str + "nextRemainingSrcAttemptId: " + getNextRemainingAttempt() + ", mappedSrcAttemptId: " + inputAttemptIdentifier);
        return false;
    }

    private InputAttemptIdentifier getNextRemainingAttempt() {
        if (this.remaining.size() > 0) {
            return this.remaining.iterator().next();
        }
        return null;
    }

    private HttpURLConnection connectToShuffleHandler(String str, int i, int i2, List<InputAttemptIdentifier> list) throws IOException {
        try {
            this.url = constructInputURL(str, i, i2, list);
            HttpURLConnection openConnection = openConnection(this.url);
            this.msgToEncode = SecureShuffleUtils.buildMsgFrom(this.url);
            this.encHash = SecureShuffleUtils.hashFromString(this.msgToEncode, this.shuffleSecret);
            openConnection.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, this.encHash);
            openConnection.setReadTimeout(this.readTimeout);
            openConnection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
            openConnection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
            connect(openConnection, this.connectionTimeout);
            return openConnection;
        } catch (IOException e) {
            LOG.warn("Failed to connect to " + str + " with " + this.srcAttempts.size() + " inputs", e);
            throw e;
        }
    }

    private void validateConnectionResponse(HttpURLConnection httpURLConnection, URL url, String str, String str2) throws IOException {
        int responseCode = httpURLConnection.getResponseCode();
        if (responseCode != 200) {
            throw new IOException("Got invalid response code " + responseCode + " from " + url + ": " + httpURLConnection.getResponseMessage());
        }
        if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(httpURLConnection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(httpURLConnection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
            throw new IOException("Incompatible shuffle response version");
        }
        String headerField = httpURLConnection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
        if (headerField == null) {
            throw new IOException("security validation of TT Map output failed");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("url=" + str + ";encHash=" + str2 + ";replyHash=" + headerField);
        }
        SecureShuffleUtils.verifyReply(headerField, str2, this.shuffleSecret);
        LOG.info("for url=" + str + " sent hash and receievd reply");
    }

    protected HttpURLConnection openConnection(URL url) throws IOException {
        HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
        if (sslShuffle) {
            HttpsURLConnection httpsURLConnection = (HttpsURLConnection) httpURLConnection;
            try {
                httpsURLConnection.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
                httpsURLConnection.setHostnameVerifier(sslFactory.getHostnameVerifier());
            } catch (GeneralSecurityException e) {
                throw new IOException(e);
            }
        }
        return httpURLConnection;
    }

    private void connect(URLConnection uRLConnection, int i) throws IOException {
        int i2 = 0;
        if (i < 0) {
            throw new IOException("Invalid timeout [timeout = " + i + " ms]");
        }
        if (i > 0) {
            i2 = Math.min(UNIT_CONNECT_TIMEOUT, i);
        }
        uRLConnection.setConnectTimeout(i2);
        while (true) {
            try {
                uRLConnection.connect();
                return;
            } catch (IOException e) {
                i -= i2;
                if (i == 0) {
                    throw e;
                }
                if (i < i2) {
                    i2 = i;
                    uRLConnection.setConnectTimeout(i2);
                }
            }
        }
    }

    private URL constructInputURL(String str, int i, int i2, List<InputAttemptIdentifier> list) throws MalformedURLException {
        StringBuilder constructBaseURIForShuffleHandler = ShuffleUtils.constructBaseURIForShuffleHandler(str, i, i2, this.appId);
        boolean z = true;
        for (InputAttemptIdentifier inputAttemptIdentifier : list) {
            if (z) {
                z = false;
                constructBaseURIForShuffleHandler.append(inputAttemptIdentifier.getPathComponent());
            } else {
                constructBaseURIForShuffleHandler.append(",").append(inputAttemptIdentifier.getPathComponent());
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("InputFetch URL for: " + str + " : " + constructBaseURIForShuffleHandler.toString());
        }
        return new URL(constructBaseURIForShuffleHandler.toString());
    }

    public int hashCode() {
        return this.fetcherIdentifier;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        return obj != null && getClass() == obj.getClass() && this.fetcherIdentifier == ((Fetcher) obj).fetcherIdentifier;
    }

    static {
        $assertionsDisabled = !Fetcher.class.desiredAssertionStatus();
        LOG = LogFactory.getLog(Fetcher.class);
        fetcherIdGen = new AtomicInteger(0);
        sslShuffle = false;
    }
}
