package com.uber.rss.decoders;

import com.uber.rss.common.DataBlockHeader;
import com.uber.rss.exceptions.RssException;
import com.uber.rss.exceptions.RssInvalidDataException;
import com.uber.rss.messages.ConnectDownloadRequest;
import com.uber.rss.messages.ConnectDownloadResponse;
import com.uber.rss.messages.ConnectNotifyRequest;
import com.uber.rss.messages.ConnectNotifyResponse;
import com.uber.rss.messages.ConnectRegistryRequest;
import com.uber.rss.messages.ConnectRegistryResponse;
import com.uber.rss.messages.ConnectUploadRequest;
import com.uber.rss.messages.ConnectUploadResponse;
import com.uber.rss.messages.FinishApplicationAttemptRequestMessage;
import com.uber.rss.messages.FinishApplicationJobRequestMessage;
import com.uber.rss.messages.FinishUploadMessage;
import com.uber.rss.messages.GetBusyStatusRequest;
import com.uber.rss.messages.GetBusyStatusResponse;
import com.uber.rss.messages.GetDataAvailabilityRequest;
import com.uber.rss.messages.GetDataAvailabilityResponse;
import com.uber.rss.messages.GetServersRequestMessage;
import com.uber.rss.messages.HeartbeatMessage;
import com.uber.rss.messages.MessageConstants;
import com.uber.rss.messages.RegisterServerRequestMessage;
import com.uber.rss.messages.ShuffleDataWrapper;
import com.uber.rss.messages.ShuffleStageStatus;
import com.uber.rss.messages.StartUploadMessage;
import com.uber.rss.metrics.M3Stats;
import com.uber.rss.metrics.NettyServerSideMetricGroupContainer;
import com.uber.rss.metrics.ServerHandlerMetrics;
import com.uber.rss.util.ByteBufUtils;
import com.uber.rss.util.LogUtils;
import com.uber.rss.util.NettyUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/uber/rss/decoders/StreamServerMessageDecoder.class */
public class StreamServerMessageDecoder extends ByteToMessageDecoder {
    private static final int INVALID_CONTROL_MESSAGE_TYPE = 0;
    private static final int INVALID_PARTITION_ID = -1;
    private static final int INVALID_SESSION_ID = -1;
    private static final int INVALID_TASK_ATTEMPT_ID = -1;
    private final ByteBuf shuffleDataBuffer;
    private static final Logger logger = LoggerFactory.getLogger(StreamServerMessageDecoder.class);
    private static NettyServerSideMetricGroupContainer<ServerHandlerMetrics> metricGroupContainer = new NettyServerSideMetricGroupContainer<>(ServerHandlerMetrics::new);
    private State state = State.READ_MAGIC_BYTE_AND_VERSION;
    private int requiredBytes = 0;
    private int controlMessageType = 0;
    private int partitionId = -1;
    private long taskAttemptId = -1;
    private final byte[] taskAttemptIdBytes = new byte[8];
    private long startTime = System.currentTimeMillis();
    private long numIncomingBytes = 0;
    private String user = "uninitialized";
    private ServerHandlerMetrics metrics = metricGroupContainer.getMetricGroup(this.user);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.uber.rss.decoders.StreamServerMessageDecoder$1, reason: invalid class name */
    /* loaded from: input_file:com/uber/rss/decoders/StreamServerMessageDecoder$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$uber$rss$decoders$StreamServerMessageDecoder$State = new int[State.values().length];

        static {
            try {
                $SwitchMap$com$uber$rss$decoders$StreamServerMessageDecoder$State[State.READ_MAGIC_BYTE_AND_VERSION.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$uber$rss$decoders$StreamServerMessageDecoder$State[State.READ_MESSAGE_TYPE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$uber$rss$decoders$StreamServerMessageDecoder$State[State.READ_CONTROL_MESSAGE_LEN.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$uber$rss$decoders$StreamServerMessageDecoder$State[State.READ_CONTROL_MESSAGE_BYTES.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$uber$rss$decoders$StreamServerMessageDecoder$State[State.READ_TASK_ATTEMPT_ID.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$uber$rss$decoders$StreamServerMessageDecoder$State[State.READ_DATA_MESSAGE_LEN.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$com$uber$rss$decoders$StreamServerMessageDecoder$State[State.READ_DATA_MESSAGE_BYTES.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/uber/rss/decoders/StreamServerMessageDecoder$State.class */
    public enum State {
        READ_MAGIC_BYTE_AND_VERSION,
        READ_MESSAGE_TYPE,
        READ_CONTROL_MESSAGE_LEN,
        READ_CONTROL_MESSAGE_BYTES,
        READ_TASK_ATTEMPT_ID,
        READ_DATA_MESSAGE_LEN,
        READ_DATA_MESSAGE_BYTES
    }

    public StreamServerMessageDecoder(ByteBuf byteBuf) {
        this.shuffleDataBuffer = byteBuf;
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelInactive(channelHandlerContext);
        if (this.shuffleDataBuffer != null) {
            this.shuffleDataBuffer.release();
        }
        metricGroupContainer.removeMetricGroup(this.user);
        logger.debug("Decoder finished, total bytes: {}, speed: {} mbs, {}", new Object[]{Long.valueOf(this.numIncomingBytes), Double.valueOf(LogUtils.calculateMegaBytesPerSecond(System.currentTimeMillis() - this.startTime, this.numIncomingBytes)), NettyUtils.getServerConnectionInfo(channelHandlerContext)});
    }

    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        int readableBytes = byteBuf.readableBytes();
        try {
            decodeImpl(channelHandlerContext, byteBuf, list);
            int readableBytes2 = readableBytes - byteBuf.readableBytes();
            this.numIncomingBytes += readableBytes2;
            this.metrics.getNumIncomingBytes().inc(readableBytes2);
        } catch (Throwable th) {
            int readableBytes3 = readableBytes - byteBuf.readableBytes();
            this.numIncomingBytes += readableBytes3;
            this.metrics.getNumIncomingBytes().inc(readableBytes3);
            throw th;
        }
    }

    private void decodeImpl(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
        this.metrics.getNumIncomingRequests().inc(1L);
        if (byteBuf.readableBytes() == 0) {
            return;
        }
        switch (AnonymousClass1.$SwitchMap$com$uber$rss$decoders$StreamServerMessageDecoder$State[this.state.ordinal()]) {
            case 1:
                if (byteBuf.readableBytes() < 2) {
                    return;
                }
                byte readByte = byteBuf.readByte();
                switch (readByte) {
                    case MessageConstants.NOTIFY_UPLINK_MAGIC_BYTE /* 99 */:
                        byte readByte2 = byteBuf.readByte();
                        if (readByte2 == 3) {
                            this.state = State.READ_MESSAGE_TYPE;
                            return;
                        }
                        String serverConnectionInfo = NettyUtils.getServerConnectionInfo(channelHandlerContext);
                        logger.warn("Invalid control version {} from client {}", Byte.valueOf(readByte2), serverConnectionInfo);
                        channelHandlerContext.close();
                        logger.debug("Closed connection to client {}", serverConnectionInfo);
                        return;
                    case MessageConstants.DOWNLOAD_UPLINK_MAGIC_BYTE /* 100 */:
                        byte readByte3 = byteBuf.readByte();
                        if (readByte3 == 3) {
                            this.state = State.READ_MESSAGE_TYPE;
                            return;
                        }
                        String serverConnectionInfo2 = NettyUtils.getServerConnectionInfo(channelHandlerContext);
                        logger.warn("Invalid download version {} from client {}", Byte.valueOf(readByte3), serverConnectionInfo2);
                        channelHandlerContext.close();
                        logger.debug("Closed connection to client {}", serverConnectionInfo2);
                        return;
                    case MessageConstants.REGISTRY_UPLINK_MAGIC_BYTE /* 114 */:
                        byte readByte4 = byteBuf.readByte();
                        if (readByte4 == 3) {
                            this.state = State.READ_MESSAGE_TYPE;
                            return;
                        }
                        String serverConnectionInfo3 = NettyUtils.getServerConnectionInfo(channelHandlerContext);
                        logger.warn("Invalid registry version {} from client {}", Byte.valueOf(readByte4), serverConnectionInfo3);
                        channelHandlerContext.close();
                        logger.debug("Closed connection to client {}", serverConnectionInfo3);
                        return;
                    case MessageConstants.UPLOAD_UPLINK_MAGIC_BYTE /* 117 */:
                        byte readByte5 = byteBuf.readByte();
                        if (readByte5 == 3) {
                            this.state = State.READ_MESSAGE_TYPE;
                            return;
                        }
                        String serverConnectionInfo4 = NettyUtils.getServerConnectionInfo(channelHandlerContext);
                        logger.warn("Invalid notify version {} from client {}", Byte.valueOf(readByte5), serverConnectionInfo4);
                        channelHandlerContext.close();
                        logger.debug("Closed connection to client {}", serverConnectionInfo4);
                        return;
                    default:
                        String serverConnectionInfo5 = NettyUtils.getServerConnectionInfo(channelHandlerContext);
                        logger.warn("Invalid magic byte {} from client {}", Byte.valueOf(readByte), serverConnectionInfo5);
                        channelHandlerContext.close();
                        logger.debug("Closed connection to client {}", serverConnectionInfo5);
                        return;
                }
            case ShuffleStageStatus.FILE_STATUS_CORRUPTED /* 2 */:
                if (byteBuf.readableBytes() < 4) {
                    return;
                }
                int readInt = byteBuf.readInt();
                if (readInt < 0) {
                    this.controlMessageType = readInt;
                    this.state = State.READ_CONTROL_MESSAGE_LEN;
                    return;
                } else {
                    this.partitionId = readInt;
                    this.state = State.READ_TASK_ATTEMPT_ID;
                    return;
                }
            case 3:
                if (byteBuf.readableBytes() < 4) {
                    return;
                }
                this.requiredBytes = byteBuf.readInt();
                if (this.requiredBytes < 0) {
                    throw new RssInvalidDataException(String.format("Invalid control message length: %s, %s", Integer.valueOf(this.requiredBytes), NettyUtils.getServerConnectionInfo(channelHandlerContext)));
                }
                if (this.requiredBytes != 0) {
                    this.state = State.READ_CONTROL_MESSAGE_BYTES;
                    return;
                }
                list.add(getControlMessage(channelHandlerContext, this.controlMessageType, byteBuf));
                resetData();
                this.state = State.READ_MESSAGE_TYPE;
                return;
            case 4:
                if (byteBuf.readableBytes() < this.requiredBytes) {
                    return;
                }
                list.add(getControlMessage(channelHandlerContext, this.controlMessageType, byteBuf));
                resetData();
                this.state = State.READ_MESSAGE_TYPE;
                return;
            case 5:
                if (byteBuf.readableBytes() < 8) {
                    return;
                }
                byteBuf.readBytes(this.taskAttemptIdBytes);
                this.taskAttemptId = ByteBufUtils.readLong(this.taskAttemptIdBytes, 0);
                if (this.taskAttemptId < 0) {
                    throw new RssInvalidDataException(String.format("Invalid task attempt id: %s, %s", Long.valueOf(this.taskAttemptId), NettyUtils.getServerConnectionInfo(channelHandlerContext)));
                }
                this.state = State.READ_DATA_MESSAGE_LEN;
                return;
            case 6:
                if (byteBuf.readableBytes() < 4) {
                    return;
                }
                int readInt2 = byteBuf.readInt();
                if (readInt2 < 0) {
                    throw new RssInvalidDataException(String.format("Invalid data length: %s, %s", Integer.valueOf(readInt2), NettyUtils.getServerConnectionInfo(channelHandlerContext)));
                }
                if (readInt2 != 0) {
                    this.requiredBytes = readInt2;
                    this.state = State.READ_DATA_MESSAGE_BYTES;
                    this.shuffleDataBuffer.clear();
                    return;
                } else {
                    list.add(createShuffleDataWrapper(byteBuf, 0));
                    resetData();
                    this.requiredBytes = 0;
                    this.state = State.READ_MESSAGE_TYPE;
                    return;
                }
            case 7:
                if (byteBuf.readableBytes() < this.requiredBytes) {
                    int readableBytes = byteBuf.readableBytes();
                    this.shuffleDataBuffer.ensureWritable(readableBytes);
                    byteBuf.readBytes(this.shuffleDataBuffer, readableBytes);
                    this.requiredBytes -= readableBytes;
                    return;
                }
                this.shuffleDataBuffer.ensureWritable(this.requiredBytes);
                byteBuf.readBytes(this.shuffleDataBuffer, this.requiredBytes);
                list.add(createShuffleDataWrapper(this.shuffleDataBuffer, this.shuffleDataBuffer.readableBytes()));
                this.requiredBytes = 0;
                resetData();
                this.state = State.READ_MESSAGE_TYPE;
                return;
            default:
                throw new RssException(String.format("Should not get incoming data in state %s, client %s", this.state, NettyUtils.getServerConnectionInfo(channelHandlerContext)));
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        M3Stats.addException(th, getClass().getSimpleName());
        logger.warn("Got exception " + NettyUtils.getServerConnectionInfo(channelHandlerContext), th);
        channelHandlerContext.close();
    }

    private Object getControlMessage(ChannelHandlerContext channelHandlerContext, int i, ByteBuf byteBuf) {
        switch (i) {
            case MessageConstants.MESSAGE_GetBusyStatusResponse /* -321 */:
                return GetBusyStatusResponse.deserialize(byteBuf);
            case MessageConstants.MESSAGE_GetBusyStatusRequest /* -320 */:
                return GetBusyStatusRequest.deserialize(byteBuf);
            case MessageConstants.MESSAGE_HeartbeatMessage /* -319 */:
                return HeartbeatMessage.deserialize(byteBuf);
            case MessageConstants.MESSAGE_ConnectDownloadRequest /* -318 */:
                ConnectDownloadRequest deserialize = ConnectDownloadRequest.deserialize(byteBuf);
                metricGroupContainer.removeMetricGroup(this.user);
                this.user = deserialize.getUser();
                this.metrics = metricGroupContainer.getMetricGroup(this.user);
                return deserialize;
            case MessageConstants.MESSAGE_FinishUploadMessage /* -317 */:
                return FinishUploadMessage.deserialize(byteBuf);
            case MessageConstants.MESSAGE_ConnectRegistryResponse /* -314 */:
                return ConnectRegistryResponse.deserialize(byteBuf);
            case MessageConstants.MESSAGE_ConnectRegistryRequest /* -313 */:
                return ConnectRegistryRequest.deserialize(byteBuf);
            case MessageConstants.MESSAGE_ConnectNotifyResponse /* -312 */:
                return ConnectNotifyResponse.deserialize(byteBuf);
            case MessageConstants.MESSAGE_ConnectNotifyRequest /* -311 */:
                return ConnectNotifyRequest.deserialize(byteBuf);
            case MessageConstants.MESSAGE_GetDataAvailabilityRequest /* -310 */:
                return GetDataAvailabilityRequest.deserialize(byteBuf);
            case MessageConstants.MESSAGE_GetDataAvailabilityResponse /* -309 */:
                return GetDataAvailabilityResponse.deserialize(byteBuf);
            case MessageConstants.MESSAGE_ConnectDownloadResponse /* -307 */:
                return ConnectDownloadResponse.deserialize(byteBuf);
            case MessageConstants.MESSAGE_StartUploadMessage /* -303 */:
                return StartUploadMessage.deserialize(byteBuf);
            case MessageConstants.MESSAGE_ConnectUploadResponse /* -302 */:
                return ConnectUploadResponse.deserialize(byteBuf);
            case MessageConstants.MESSAGE_ConnectUploadRequest /* -301 */:
                ConnectUploadRequest deserialize2 = ConnectUploadRequest.deserialize(byteBuf);
                metricGroupContainer.removeMetricGroup(this.user);
                this.user = deserialize2.getUser();
                this.metrics = metricGroupContainer.getMetricGroup(this.user);
                return deserialize2;
            case MessageConstants.MESSAGE_FinishApplicationJobRequest /* -12 */:
                return FinishApplicationJobRequestMessage.deserialize(byteBuf);
            case MessageConstants.MESSAGE_GetServersRequest /* -10 */:
                return GetServersRequestMessage.deserialize(byteBuf);
            case MessageConstants.MESSAGE_RegisterServerRequest /* -9 */:
                return RegisterServerRequestMessage.deserialize(byteBuf);
            case MessageConstants.MESSAGE_FinishApplicationAttemptRequest /* -7 */:
                return FinishApplicationAttemptRequestMessage.deserialize(byteBuf);
            default:
                throw new RssException(String.format("Unsupported control message type %s from client %s", Integer.valueOf(i), NettyUtils.getServerConnectionInfo(channelHandlerContext)));
        }
    }

    private ShuffleDataWrapper createShuffleDataWrapper(ByteBuf byteBuf, int i) {
        this.metrics.getNumIncomingBlocks().inc(1L);
        byte[] serializeToBytes = DataBlockHeader.serializeToBytes(this.taskAttemptIdBytes, i);
        byte[] bArr = new byte[serializeToBytes.length + i];
        System.arraycopy(serializeToBytes, 0, bArr, 0, serializeToBytes.length);
        byteBuf.readBytes(bArr, serializeToBytes.length, i);
        return new ShuffleDataWrapper(this.partitionId, this.taskAttemptId, bArr);
    }

    private void resetData() {
        this.controlMessageType = 0;
        this.partitionId = -1;
        this.taskAttemptId = -1L;
    }
}
