package com.uber.rss.handlers;

import com.uber.rss.clients.ShuffleWriteConfig;
import com.uber.rss.common.AppShuffleId;
import com.uber.rss.exceptions.RssInvalidDataException;
import com.uber.rss.exceptions.RssMaxConnectionsException;
import com.uber.rss.exceptions.RssTooMuchDataException;
import com.uber.rss.execution.ShuffleExecutor;
import com.uber.rss.messages.ConnectUploadRequest;
import com.uber.rss.messages.ConnectUploadResponse;
import com.uber.rss.messages.FinishUploadMessage;
import com.uber.rss.messages.GetBusyStatusRequest;
import com.uber.rss.messages.GetBusyStatusResponse;
import com.uber.rss.messages.HeartbeatMessage;
import com.uber.rss.messages.ShuffleDataWrapper;
import com.uber.rss.messages.StartUploadMessage;
import com.uber.rss.metrics.M3Stats;
import com.uber.rss.util.ExceptionUtils;
import com.uber.rss.util.NettyUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rss_shaded.com.uber.m3.tally.Counter;
import rss_shaded.com.uber.m3.tally.Gauge;

/* loaded from: input_file:com/uber/rss/handlers/UploadChannelInboundHandler.class */
public class UploadChannelInboundHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(UploadChannelInboundHandler.class);
    private static Counter numChannelActive = M3Stats.getDefaultScope().counter("numUploadChannelActive");
    private static Counter numChannelInactive = M3Stats.getDefaultScope().counter("numUploadChannelInactive");
    private static AtomicInteger concurrentChannelsAtomicInteger = new AtomicInteger();
    private static Gauge numConcurrentChannels = M3Stats.getDefaultScope().gauge("numConcurrentUploadChannels");
    private static Gauge finishUploadRequestLag = M3Stats.getDefaultScope().gauge("finishUploadRequestLag");
    private static Counter closedIdleUploadChannels = M3Stats.getDefaultScope().counter("closedIdleUploadChannels");
    private final String serverId;
    private final long idleTimeoutMillis;
    private final UploadServerHandler uploadServerHandler;
    private ChannelIdleCheck idleCheck;
    private String connectionInfo = "";
    private String appId = null;
    private String appAttempt = null;
    private StartUploadMessage startUploadMessage = null;
    final int CONCURRENT_CONNS = 1;

    public UploadChannelInboundHandler(String str, long j, ShuffleExecutor shuffleExecutor, UploadChannelManager uploadChannelManager) {
        this.serverId = str;
        this.uploadServerHandler = new UploadServerHandler(shuffleExecutor, uploadChannelManager);
        this.idleTimeoutMillis = j;
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelActive(channelHandlerContext);
        processChannelActive(channelHandlerContext);
    }

    public void processChannelActive(ChannelHandlerContext channelHandlerContext) {
        logger.debug("Channel active: {}", this.connectionInfo);
        numChannelActive.inc(1L);
        numConcurrentChannels.update(concurrentChannelsAtomicInteger.incrementAndGet());
        this.connectionInfo = NettyUtils.getServerConnectionInfo(channelHandlerContext);
        this.idleCheck = new ChannelIdleCheck(channelHandlerContext, this.idleTimeoutMillis, closedIdleUploadChannels);
        this.idleCheck.schedule();
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelInactive(channelHandlerContext);
        logger.debug("Channel inactive: {}", this.connectionInfo);
        numChannelInactive.inc(1L);
        numConcurrentChannels.update(concurrentChannelsAtomicInteger.decrementAndGet());
        this.uploadServerHandler.onChannelInactive();
        if (this.idleCheck != null) {
            this.idleCheck.cancel();
        }
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        try {
            if (logger.isDebugEnabled() && !(obj instanceof ShuffleDataWrapper)) {
                logger.debug("Got incoming message: {}, {}", obj, this.connectionInfo);
            }
            if (this.idleCheck != null) {
                this.idleCheck.updateLastReadTime();
            }
            if (obj instanceof ConnectUploadRequest) {
                try {
                    this.uploadServerHandler.checkMaxConnections();
                    ConnectUploadRequest connectUploadRequest = (ConnectUploadRequest) obj;
                    this.appId = connectUploadRequest.getAppId();
                    this.appAttempt = connectUploadRequest.getAppAttempt();
                    try {
                        this.uploadServerHandler.checkAppMaxWriteBytes(this.appId);
                    } catch (RssTooMuchDataException e) {
                        logger.info("Cannot handle new connection due to writing too much data from app (%s). Closing current connection: {}. {}", new Object[]{this.appId, this.connectionInfo, ExceptionUtils.getSimpleMessage(e)});
                        M3Stats.addException(e, getClass().getSimpleName());
                        ByteBuf buffer = channelHandlerContext.alloc().buffer(1);
                        buffer.writeByte(54);
                        channelHandlerContext.writeAndFlush(buffer).addListener(ChannelFutureListener.CLOSE);
                    }
                    this.uploadServerHandler.updateLiveness(this.appId);
                    HandlerUtil.writeResponseMsg(channelHandlerContext, (byte) 20, new ConnectUploadResponse(this.serverId), true);
                } catch (RssMaxConnectionsException e2) {
                    logger.info("Cannot handle new connection due to server capacity. Closing current connection: {}. {}", this.connectionInfo, ExceptionUtils.getSimpleMessage(e2));
                    M3Stats.addException(e2, getClass().getSimpleName());
                    ByteBuf buffer2 = channelHandlerContext.alloc().buffer(1);
                    buffer2.writeByte(53);
                    channelHandlerContext.writeAndFlush(buffer2).addListener(ChannelFutureListener.CLOSE);
                    ReferenceCountUtil.release(obj);
                    return;
                }
            } else if (obj instanceof StartUploadMessage) {
                this.startUploadMessage = (StartUploadMessage) obj;
                this.uploadServerHandler.initializeAppTaskAttempt(new AppShuffleId(this.appId, this.appAttempt, this.startUploadMessage.getShuffleId()), this.startUploadMessage.getAttemptId(), this.startUploadMessage.getNumPartitions(), new ShuffleWriteConfig(this.startUploadMessage.getNumSplits()), channelHandlerContext);
            } else if (obj instanceof FinishUploadMessage) {
                logger.debug("FinishUploadMessage, {}, {}", obj, this.connectionInfo);
                FinishUploadMessage finishUploadMessage = (FinishUploadMessage) obj;
                finishUploadRequestLag.update(System.currentTimeMillis() - finishUploadMessage.getTimestamp());
                byte ackFlag = finishUploadMessage.getAckFlag();
                this.uploadServerHandler.finishUpload(finishUploadMessage.getTaskAttemptId());
                if (ackFlag != 0) {
                    ByteBuf buffer3 = channelHandlerContext.alloc().buffer(1);
                    buffer3.writeByte(20);
                    channelHandlerContext.writeAndFlush(buffer3);
                }
            } else if (obj instanceof ShuffleDataWrapper) {
                this.uploadServerHandler.writeRecord((ShuffleDataWrapper) obj);
            } else if (obj instanceof HeartbeatMessage) {
                HeartbeatMessage heartbeatMessage = (HeartbeatMessage) obj;
                String appId = heartbeatMessage.getAppId();
                boolean isKeepLive = heartbeatMessage.isKeepLive();
                this.uploadServerHandler.updateLiveness(appId);
                if (!isKeepLive) {
                    channelHandlerContext.close();
                }
            } else {
                if (!(obj instanceof GetBusyStatusRequest)) {
                    throw new RssInvalidDataException(String.format("Unsupported message: %s, %s", obj, this.connectionInfo));
                }
                GetBusyStatusResponse getBusyStatusResponse = new GetBusyStatusResponse(new HashMap(), new HashMap());
                getBusyStatusResponse.getMetrics().put(new Long(1L), new Long(concurrentChannelsAtomicInteger.get()));
                HandlerUtil.writeResponseMsg(channelHandlerContext, (byte) 20, getBusyStatusResponse, true).addListener(ChannelFutureListener.CLOSE);
            }
            ReferenceCountUtil.release(obj);
        } catch (Throwable th) {
            ReferenceCountUtil.release(obj);
            throw th;
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        M3Stats.addException(th, getClass().getSimpleName());
        logger.warn("Got exception " + this.connectionInfo, th);
        channelHandlerContext.close();
    }
}
