package com.uber.rss.handlers;

import com.uber.rss.clients.ShuffleWriteConfig;
import com.uber.rss.common.AppShuffleId;
import com.uber.rss.common.FilePathAndLength;
import com.uber.rss.exceptions.RssInvalidStateException;
import com.uber.rss.exceptions.RssShuffleCorruptedException;
import com.uber.rss.execution.ShuffleExecutor;
import com.uber.rss.messages.ConnectDownloadRequest;
import com.uber.rss.messages.ShuffleStageStatus;
import com.uber.rss.metrics.M3Stats;
import com.uber.rss.storage.ShuffleFileStorage;
import com.uber.rss.storage.ShuffleStorage;
import com.uber.rss.util.ExceptionUtils;
import com.uber.rss.util.LogUtils;
import com.uber.rss.util.NettyUtils;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelProgressiveFuture;
import io.netty.channel.ChannelProgressiveFutureListener;
import io.netty.channel.DefaultFileRegion;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rss_shaded.com.uber.m3.tally.Counter;
import rss_shaded.com.uber.m3.tally.Gauge;

/* loaded from: input_file:com/uber/rss/handlers/DownloadServerHandler.class */
public class DownloadServerHandler {
    private static final Logger logger = LoggerFactory.getLogger(DownloadServerHandler.class);
    private static final AtomicInteger numConcurrentReadFilesAtomicInteger = new AtomicInteger();
    private static final Gauge numConcurrentReadFiles = M3Stats.getDefaultScope().gauge("numConcurrentReadFiles");
    private static final Counter numReadFileBytes = M3Stats.getDefaultScope().counter("numReadFileBytes");
    private final ShuffleExecutor executor;
    private final ShuffleStorage storage = new ShuffleFileStorage();
    private AppShuffleId appShuffleId;
    private int partitionId;

    public DownloadServerHandler(ShuffleExecutor shuffleExecutor) {
        this.executor = shuffleExecutor;
    }

    public void initialize(ConnectDownloadRequest connectDownloadRequest) {
        this.appShuffleId = new AppShuffleId(connectDownloadRequest.getAppId(), connectDownloadRequest.getAppAttempt(), connectDownloadRequest.getShuffleId());
        this.partitionId = connectDownloadRequest.getPartitionId();
    }

    public ShuffleWriteConfig getShuffleWriteConfig(AppShuffleId appShuffleId) {
        return this.executor.getShuffleWriteConfig(appShuffleId);
    }

    public ShuffleStageStatus getShuffleStageStatus(AppShuffleId appShuffleId) {
        return this.executor.getShuffleStageStatus(appShuffleId);
    }

    public List<FilePathAndLength> getNonEmptyPartitionFiles(String str) {
        if (!this.storage.isLocalStorage()) {
            throw new RssInvalidStateException("Only local file storage is supported to download shuffle data, closing the connection");
        }
        List<FilePathAndLength> list = (List) this.executor.getPersistedBytes(this.appShuffleId, this.partitionId).stream().filter(filePathAndLength -> {
            return filePathAndLength.getLength() > 0;
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return Collections.emptyList();
        }
        for (FilePathAndLength filePathAndLength2 : list) {
            if (!this.storage.exists(filePathAndLength2.getPath())) {
                throw new RssShuffleCorruptedException(String.format("Shuffle file %s not found for partition %s, %s, %s, but there are persisted bytes: %s", filePathAndLength2.getPath(), Integer.valueOf(this.partitionId), this.appShuffleId, str, Long.valueOf(filePathAndLength2.getLength())));
            }
            long size = this.storage.size(filePathAndLength2.getPath());
            if (size <= 0) {
                throw new RssShuffleCorruptedException(String.format("Shuffle file %s is empty for partition %s, %s, %s, but there are persisted bytes: %s", filePathAndLength2.getPath(), Integer.valueOf(this.partitionId), this.appShuffleId, str, Long.valueOf(filePathAndLength2.getLength())));
            }
            if (size < filePathAndLength2.getLength()) {
                throw new RssShuffleCorruptedException(String.format("Shuffle file %s has less size %s than expected %s for partition %s, %s, %s", filePathAndLength2.getPath(), Long.valueOf(size), Long.valueOf(filePathAndLength2.getLength()), Integer.valueOf(this.partitionId), this.appShuffleId, str));
            }
        }
        long sum = list.stream().mapToLong(filePathAndLength3 -> {
            return filePathAndLength3.getLength();
        }).sum();
        if (sum == 0) {
            logger.info("Total file length is zero: {}, {}", StringUtils.join(list, ','), str);
            return Collections.emptyList();
        }
        if (sum < 0) {
            throw new RssInvalidStateException(String.format("Invalid total file length: %s, %s", Long.valueOf(sum), str));
        }
        return list;
    }

    public void finishShuffleStage(AppShuffleId appShuffleId) {
        this.executor.finishShuffleStage(appShuffleId);
    }

    public ChannelFuture sendFiles(ChannelHandlerContext channelHandlerContext, final List<FilePathAndLength> list, final ChannelIdleCheck channelIdleCheck) {
        final String serverConnectionInfo = NettyUtils.getServerConnectionInfo(channelHandlerContext);
        channelIdleCheck.updateLastReadTime();
        ChannelFuture channelFuture = null;
        for (int i = 0; i < list.size(); i++) {
            final int i2 = i;
            final String path = list.get(i2).getPath();
            final long length = list.get(i2).getLength();
            logger.info("Downloader server sending file: {} ({} of {}, {} bytes), {}", new Object[]{path, Integer.valueOf(i2 + 1), Integer.valueOf(list.size()), Long.valueOf(length), serverConnectionInfo});
            ChannelFuture writeAndFlush = channelHandlerContext.writeAndFlush(new DefaultFileRegion(new File(path), 0L, length), channelHandlerContext.newProgressivePromise());
            numConcurrentReadFiles.update(numConcurrentReadFilesAtomicInteger.incrementAndGet());
            final long currentTimeMillis = System.currentTimeMillis();
            writeAndFlush.addListener(new ChannelProgressiveFutureListener() { // from class: com.uber.rss.handlers.DownloadServerHandler.1
                public void operationComplete(ChannelProgressiveFuture channelProgressiveFuture) throws Exception {
                    DownloadServerHandler.this.executor.updateLiveness(DownloadServerHandler.this.appShuffleId.getAppId());
                    channelIdleCheck.updateLastReadTime();
                    DownloadServerHandler.numConcurrentReadFiles.update(DownloadServerHandler.numConcurrentReadFilesAtomicInteger.decrementAndGet());
                    DownloadServerHandler.numReadFileBytes.inc(length);
                    String str = "";
                    Throwable cause = channelProgressiveFuture.cause();
                    if (cause != null) {
                        M3Stats.addException(cause, getClass().getSimpleName());
                        str = String.format(", exception: %s, %s", ExceptionUtils.getSimpleMessage(channelProgressiveFuture.cause()), org.apache.commons.lang3.exception.ExceptionUtils.getStackTrace(channelProgressiveFuture.cause()));
                    }
                    DownloadServerHandler.logger.info("Finished sending file: {} ({} of {}), success: {} ({} mbs, total {} bytes), connection: {} {} {}", new Object[]{path, Integer.valueOf(i2 + 1), Integer.valueOf(list.size()), Boolean.valueOf(channelProgressiveFuture.isSuccess()), Double.valueOf(LogUtils.calculateMegaBytesPerSecond(System.currentTimeMillis() - currentTimeMillis, length)), Long.valueOf(length), serverConnectionInfo, Long.valueOf(System.nanoTime()), str});
                }

                public void operationProgressed(ChannelProgressiveFuture channelProgressiveFuture, long j, long j2) throws Exception {
                    DownloadServerHandler.logger.debug("Sending file: {}, progress: {} out of {} bytes, {} mbs, {}", new Object[]{path, Long.valueOf(j), Long.valueOf(j2), Double.valueOf(LogUtils.calculateMegaBytesPerSecond(System.currentTimeMillis() - currentTimeMillis, j)), serverConnectionInfo});
                    DownloadServerHandler.this.executor.updateLiveness(DownloadServerHandler.this.appShuffleId.getAppId());
                    channelIdleCheck.updateLastReadTime();
                }
            });
            channelFuture = writeAndFlush;
        }
        return channelFuture;
    }
}
