package com.uber.rss.clients;

import com.uber.rss.common.AppShufflePartitionId;
import com.uber.rss.common.Compression;
import com.uber.rss.common.DataBlock;
import com.uber.rss.common.DataBlockHeader;
import com.uber.rss.common.FixedLengthInputStream;
import com.uber.rss.common.MapTaskCommitStatus;
import com.uber.rss.exceptions.ExceptionWrapper;
import com.uber.rss.exceptions.RssEndOfStreamException;
import com.uber.rss.exceptions.RssException;
import com.uber.rss.exceptions.RssInvalidDataException;
import com.uber.rss.exceptions.RssInvalidStateException;
import com.uber.rss.exceptions.RssMissingShuffleWriteConfigException;
import com.uber.rss.exceptions.RssShuffleCorruptedException;
import com.uber.rss.exceptions.RssShuffleDataNotAvailableException;
import com.uber.rss.exceptions.RssShuffleStageNotStartedException;
import com.uber.rss.exceptions.RssStreamReadException;
import com.uber.rss.messages.ConnectDownloadRequest;
import com.uber.rss.messages.ConnectDownloadResponse;
import com.uber.rss.messages.GetDataAvailabilityRequest;
import com.uber.rss.messages.GetDataAvailabilityResponse;
import com.uber.rss.messages.MessageConstants;
import com.uber.rss.metrics.M3Stats;
import com.uber.rss.metrics.ReadClientMetrics;
import com.uber.rss.metrics.ReadClientMetricsKey;
import com.uber.rss.util.ByteBufUtils;
import com.uber.rss.util.ObjectWrapper;
import com.uber.rss.util.RetryUtils;
import com.uber.rss.util.StreamUtils;
import com.uber.rss.util.StringUtils;
import java.io.InputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rss_shaded.com.uber.m3.tally.Stopwatch;

/* loaded from: input_file:com/uber/rss/clients/DataBlockSocketReadClient.class */
public class DataBlockSocketReadClient extends ClientBase {
    private static final Logger logger = LoggerFactory.getLogger(DataBlockSocketReadClient.class);
    private static final int POLL_INTERVAL_MAX_MULTIPLIER = 100;
    private final String user;
    private final AppShufflePartitionId appShufflePartitionId;
    private final Set<Long> fetchTaskAttemptIds;
    private final long dataAvailablePollInterval;
    private final long dataAvailableWaitTime;
    private ReadClientMetrics metrics;
    private String fileCompressionCodec;
    private MapTaskCommitStatus commitMapTaskCommitStatus;
    private Set<Long> commitTaskAttemptIds;
    private boolean downloadStarted;
    private long dataLength;
    private int totalReadDataBlocks;
    private FixedLengthInputStream fixedLengthInputStream;

    public DataBlockSocketReadClient(String str, int i, int i2, String str2, AppShufflePartitionId appShufflePartitionId, Collection<Long> collection, long j, long j2) {
        super(str, i, i2);
        this.downloadStarted = false;
        this.dataLength = -1L;
        this.totalReadDataBlocks = 0;
        this.user = str2;
        this.appShufflePartitionId = appShufflePartitionId;
        this.fetchTaskAttemptIds = new HashSet(collection);
        this.dataAvailablePollInterval = j;
        this.dataAvailableWaitTime = j2;
        this.metrics = new ReadClientMetrics(new ReadClientMetricsKey(getClass().getSimpleName(), str2));
    }

    public ConnectDownloadResponse connect() {
        Stopwatch start = this.metrics.getReadConnectLatency().start();
        try {
            return connectImpl();
        } finally {
            start.stop();
        }
    }

    private ConnectDownloadResponse connectImpl() {
        if (this.socket != null) {
            throw new RssInvalidStateException(String.format("Already connected to server, cannot connect again: %s", this.connectionInfo));
        }
        logger.debug("Connecting to server: {}", this.connectionInfo);
        connectSocket();
        write((byte) 100);
        write((byte) 3);
        ConnectDownloadRequest connectDownloadRequest = new ConnectDownloadRequest(this.user, this.appShufflePartitionId, this.fetchTaskAttemptIds);
        ExceptionWrapper exceptionWrapper = new ExceptionWrapper();
        Boolean bool = (Boolean) RetryUtils.retryUntilNotNull(this.dataAvailablePollInterval, this.dataAvailablePollInterval * 100, this.dataAvailableWaitTime, () -> {
            try {
                writeControlMessageAndWaitResponseStatus(connectDownloadRequest);
                return Boolean.TRUE;
            } catch (RssMissingShuffleWriteConfigException | RssShuffleStageNotStartedException e) {
                exceptionWrapper.setException(e);
                logger.warn(String.format("Did not find data in server side, server may not run fast enough to get data from client or server hits some issue, %s", this.appShufflePartitionId), e);
                return null;
            } catch (RssShuffleCorruptedException e2) {
                throw new RssShuffleCorruptedException("Shuffle data corrupted for: " + this.appShufflePartitionId, e2);
            }
        });
        if (bool == null || !bool.booleanValue()) {
            if (exceptionWrapper.getException() != null) {
                throw ((RssException) exceptionWrapper.getException());
            }
            throw new RssInvalidStateException(String.format("Failed to connect to server %s, %s", this.connectionInfo, this.appShufflePartitionId));
        }
        ConnectDownloadResponse connectDownloadResponse = (ConnectDownloadResponse) readResponseMessage(MessageConstants.MESSAGE_ConnectDownloadResponse, ConnectDownloadResponse::deserialize);
        logger.info("Connected to server: {}, response: {}", this.connectionInfo, connectDownloadResponse);
        this.fileCompressionCodec = connectDownloadResponse.getCompressionCodec();
        if (connectDownloadResponse.isDataAvailable()) {
            this.commitMapTaskCommitStatus = connectDownloadResponse.getMapTaskCommitStatus();
            if (this.commitMapTaskCommitStatus == null) {
                throw new RssInvalidDataException("MapTaskCommitStatus should not be null");
            }
            this.commitTaskAttemptIds = this.commitMapTaskCommitStatus.getTaskAttemptIds();
            if (!this.commitTaskAttemptIds.containsAll(this.fetchTaskAttemptIds)) {
                throw new RssInvalidDataException(String.format("Task attempt ids not matched, committed: %s, fetching: %s", this.commitTaskAttemptIds, this.fetchTaskAttemptIds));
            }
        }
        return connectDownloadResponse;
    }

    public GetDataAvailabilityResponse waitDataAvailable() {
        if (this.commitMapTaskCommitStatus != null) {
            throw new RssInvalidStateException("Data already available, should not wait again");
        }
        long currentTimeMillis = System.currentTimeMillis();
        logger.info("Waiting for all mappers finished: {}, {}", this.appShufflePartitionId, this.connectionInfo);
        Stopwatch start = this.metrics.getReducerWaitTime().start();
        ObjectWrapper objectWrapper = new ObjectWrapper();
        try {
            RetryUtils.retryUntilNotNull(this.dataAvailablePollInterval, this.dataAvailablePollInterval * 100, this.dataAvailableWaitTime, () -> {
                GetDataAvailabilityResponse dataAvailability = getDataAvailability();
                objectWrapper.setObject(dataAvailability);
                if (dataAvailability.isDataAvailable()) {
                    return dataAvailability;
                }
                return null;
            });
            start.stop();
            logger.info("Finished waiting for all mappers to finish, partition: {}, duration: {} seconds", this.appShufflePartitionId, Long.valueOf((System.currentTimeMillis() - currentTimeMillis) / 1000));
            GetDataAvailabilityResponse getDataAvailabilityResponse = (GetDataAvailabilityResponse) objectWrapper.getObject();
            if (getDataAvailabilityResponse != null && getDataAvailabilityResponse.isDataAvailable()) {
                this.commitMapTaskCommitStatus = getDataAvailabilityResponse.getMapTaskCommitStatus();
                if (this.commitMapTaskCommitStatus == null) {
                    throw new RssInvalidDataException("MapTaskCommitStatus should not be null");
                }
                this.commitTaskAttemptIds = this.commitMapTaskCommitStatus.getTaskAttemptIds();
                if (this.commitTaskAttemptIds.containsAll(this.fetchTaskAttemptIds)) {
                    return getDataAvailabilityResponse;
                }
                throw new RssInvalidDataException(String.format("Task attempt ids not matched, committed: %s, fetching: %s", this.commitTaskAttemptIds, this.fetchTaskAttemptIds));
            }
            String str = "";
            if (getDataAvailabilityResponse != null && getDataAvailabilityResponse.getMapTaskCommitStatus() != null) {
                MapTaskCommitStatus mapTaskCommitStatus = getDataAvailabilityResponse.getMapTaskCommitStatus();
                if (mapTaskCommitStatus.getTaskAttemptIds().isEmpty()) {
                    str = String.format("no task attempt committed", new Object[0]);
                } else {
                    List list = (List) mapTaskCommitStatus.getTaskAttemptIds().stream().collect(Collectors.toList());
                    Collections.sort(list);
                    str = String.format("committed task ids: %s, fetching tasks: %s", StringUtils.toString4SortedNumberList(list), StringUtils.toString4SortedNumberList((List) this.fetchTaskAttemptIds.stream().sorted().collect(Collectors.toList())));
                }
            }
            throw new RssShuffleDataNotAvailableException(String.format("Not all mappers finished after trying %s:%s for %s millis, partition: %s, %s", this.host, Integer.valueOf(this.port), Long.valueOf(this.dataAvailableWaitTime), this.appShufflePartitionId, str));
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    public DataBlock readDataBlock() {
        try {
            DataBlock readDataBlockNoCheckTaskAttemptId = readDataBlockNoCheckTaskAttemptId();
            while (readDataBlockNoCheckTaskAttemptId != null) {
                this.totalReadDataBlocks++;
                if (this.fetchTaskAttemptIds.contains(Long.valueOf(readDataBlockNoCheckTaskAttemptId.getHeader().getTaskAttemptId()))) {
                    break;
                }
                readDataBlockNoCheckTaskAttemptId = readDataBlockNoCheckTaskAttemptId();
                this.metrics.getNumIgnoredBlocks().inc(1L);
            }
            return readDataBlockNoCheckTaskAttemptId;
        } catch (Throwable th) {
            if (this.fixedLengthInputStream != null) {
                throw new RssStreamReadException(String.format("Bad data stream, total expected bytes: %s, remaining unread bytes: %s, %s", Long.valueOf(this.fixedLengthInputStream.getLength()), Long.valueOf(this.fixedLengthInputStream.getRemaining()), this.connectionInfo), th);
            }
            throw th;
        }
    }

    @Override // com.uber.rss.clients.ClientBase, java.lang.AutoCloseable
    public void close() {
        try {
            super.close();
            closeMetrics();
        } catch (Throwable th) {
            logger.warn(String.format("Failed to close read client %s", this), th);
        }
    }

    public AppShufflePartitionId getAppShufflePartitionId() {
        return this.appShufflePartitionId;
    }

    @Override // com.uber.rss.clients.ClientBase
    public String toString() {
        return "DataBlockSocketReadClient{user='" + this.user + "', appShufflePartitionId=" + this.appShufflePartitionId + ", downloadStarted=" + this.downloadStarted + ", totalReadDataBlocks=" + this.totalReadDataBlocks + ", connectionInfo=" + this.connectionInfo + '}';
    }

    private void closeMetrics() {
        try {
            if (this.metrics != null) {
                this.metrics.close();
                this.metrics = null;
            }
        } catch (Throwable th) {
            M3Stats.addException(th, getClass().getSimpleName());
            logger.warn(String.format("Failed to close metrics: %s", this.connectionInfo), th);
        }
    }

    private GetDataAvailabilityResponse getDataAvailability() {
        writeControlMessageAndWaitResponseStatus(new GetDataAvailabilityRequest());
        return (GetDataAvailabilityResponse) readResponseMessage(MessageConstants.MESSAGE_GetDataAvailabilityResponse, GetDataAvailabilityResponse::deserialize);
    }

    private DataBlock readDataBlockNoCheckTaskAttemptId() {
        if (!this.downloadStarted) {
            if (this.commitTaskAttemptIds == null) {
                waitDataAvailable();
            }
            startDownload();
            this.downloadStarted = true;
        }
        if (this.commitTaskAttemptIds == null) {
            throw new RssInvalidStateException(String.format("commitTaskAttemptIds is null, %s", this.connectionInfo));
        }
        if (this.commitTaskAttemptIds.isEmpty()) {
            throw new RssInvalidStateException(String.format("commitTaskAttemptIds is empty, %s", this.connectionInfo));
        }
        DataBlockHeader readDataBlockHeader = readDataBlockHeader(this.inputStream);
        if (readDataBlockHeader == null) {
            return null;
        }
        byte[] readBytes = StreamUtils.readBytes(this.inputStream, readDataBlockHeader.getLength());
        if (readBytes == null) {
            throw new RssEndOfStreamException("Failed to read data block: " + toString());
        }
        return new DataBlock(readDataBlockHeader, readBytes);
    }

    private void startDownload() {
        byte[] readBytes = StreamUtils.readBytes(this.inputStream, 8);
        if (readBytes == null) {
            throw new RssEndOfStreamException(String.format("Hit unexpected end of stream: %s", this.connectionInfo));
        }
        this.dataLength = ByteBufUtils.readLong(readBytes, 0);
        if (this.dataLength < 0) {
            throw new RssInvalidDataException(String.format("Invalid data length: %s, %s", Long.valueOf(this.dataLength), this.connectionInfo));
        }
        logger.info("Data length to read: {}", Long.valueOf(this.dataLength));
        this.fixedLengthInputStream = new FixedLengthInputStream(this.inputStream, this.dataLength);
        this.inputStream = this.fixedLengthInputStream;
        InputStream decompressStream = Compression.decompressStream(this.inputStream, this.fileCompressionCodec);
        if (decompressStream != this.inputStream) {
            this.inputStream = decompressStream;
            logger.info("Switched to compressing stream {}, {}", this.appShufflePartitionId, this.connectionInfo);
        }
    }

    private DataBlockHeader readDataBlockHeader(InputStream inputStream) {
        byte[] readBytes = StreamUtils.readBytes(inputStream, DataBlockHeader.NUM_BYTES);
        if (readBytes != null) {
            this.metrics.getNumReadBytes().inc(readBytes.length);
            return DataBlockHeader.deserializeFromBytes(readBytes);
        }
        if (this.fixedLengthInputStream == null || this.fixedLengthInputStream.getRemaining() == 0) {
            return null;
        }
        throw new RssInvalidDataException(String.format("Bad data stream, total expected bytes: %s, remaining unread bytes: %s", Long.valueOf(this.fixedLengthInputStream.getLength()), Long.valueOf(this.fixedLengthInputStream.getRemaining())));
    }
}
