package com.uber.rss.clients;

import com.uber.rss.common.ShuffleMapTaskAttemptId;
import com.uber.rss.exceptions.RssFinishUploadException;
import com.uber.rss.exceptions.RssInvalidStateException;
import com.uber.rss.exceptions.RssNetworkException;
import com.uber.rss.messages.ConnectUploadRequest;
import com.uber.rss.messages.ConnectUploadResponse;
import com.uber.rss.messages.FinishUploadMessage;
import com.uber.rss.messages.MessageConstants;
import com.uber.rss.messages.StartUploadMessage;
import com.uber.rss.metrics.M3Stats;
import com.uber.rss.metrics.WriteClientMetrics;
import com.uber.rss.metrics.WriteClientMetricsKey;
import com.uber.rss.util.ByteBufUtils;
import com.uber.rss.util.ExceptionUtils;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rss_shaded.com.uber.m3.tally.Stopwatch;

/* loaded from: input_file:com/uber/rss/clients/DataBlockSyncWriteClient.class */
public class DataBlockSyncWriteClient extends ClientBase {
    private static final Logger logger = LoggerFactory.getLogger(DataBlockSyncWriteClient.class);
    private final boolean finishUploadAck;
    private final String user;
    private final String appId;
    private final String appAttempt;
    private long totalWriteBytes;
    private long startUploadShuffleByteSnapshot;
    private WriteClientMetrics metrics;

    public DataBlockSyncWriteClient(String str, int i, int i2, String str2, String str3, String str4) {
        this(str, i, i2, true, str2, str3, str4);
    }

    public DataBlockSyncWriteClient(String str, int i, int i2, boolean z, String str2, String str3, String str4) {
        super(str, i, i2);
        this.totalWriteBytes = 0L;
        this.startUploadShuffleByteSnapshot = 0L;
        this.metrics = null;
        this.finishUploadAck = z;
        this.user = str2;
        this.appId = str3;
        this.appAttempt = str4;
        this.metrics = new WriteClientMetrics(new WriteClientMetricsKey(getClass().getSimpleName(), str2));
        this.metrics.getNumClients().inc(1L);
    }

    public ConnectUploadResponse connect() {
        Stopwatch start = this.metrics.getWriteConnectLatency().start();
        try {
            return connectImpl();
        } finally {
            start.stop();
        }
    }

    private ConnectUploadResponse connectImpl() {
        if (this.socket != null) {
            throw new RssInvalidStateException(String.format("Already connected to server, cannot connect again: %s", this.connectionInfo));
        }
        ConnectUploadRequest connectUploadRequest = new ConnectUploadRequest(this.user, this.appId, this.appAttempt);
        logger.debug(String.format("Connecting to server: %s", this.connectionInfo));
        connectSocket();
        write((byte) 117);
        write((byte) 3);
        writeControlMessageAndWaitResponseStatus(connectUploadRequest);
        ConnectUploadResponse connectUploadResponse = (ConnectUploadResponse) readResponseMessage(MessageConstants.MESSAGE_ConnectUploadResponse, ConnectUploadResponse::deserialize);
        logger.info(String.format("Connected to server: %s, response: %s", this.connectionInfo, connectUploadResponse));
        return connectUploadResponse;
    }

    public void startUpload(ShuffleMapTaskAttemptId shuffleMapTaskAttemptId, int i, int i2, ShuffleWriteConfig shuffleWriteConfig) {
        logger.debug(String.format("Starting upload %s, %s", shuffleMapTaskAttemptId, this.connectionInfo));
        this.startUploadShuffleByteSnapshot = this.totalWriteBytes;
        writeControlMessageNotWaitResponseStatus(new StartUploadMessage(shuffleMapTaskAttemptId.getShuffleId(), shuffleMapTaskAttemptId.getMapId(), shuffleMapTaskAttemptId.getTaskAttemptId(), i, i2, "", shuffleWriteConfig.getNumSplits()));
    }

    public void writeData(int i, long j, ByteBuf byteBuf) {
        int readableBytes = byteBuf.readableBytes();
        byte[] bArr = new byte[16];
        ByteBufUtils.writeInt(bArr, 0, i);
        ByteBufUtils.writeLong(bArr, 4, j);
        ByteBufUtils.writeInt(bArr, 12, readableBytes);
        try {
            this.outputStream.write(bArr);
            try {
                ByteBufUtils.readBytesToStream(byteBuf, this.outputStream);
                long j2 = 16 + readableBytes;
                this.totalWriteBytes += j2;
                this.metrics.getNumWriteBytes().inc(j2);
            } catch (IOException e) {
                throw new RssNetworkException(String.format("writeRowGroup: hit exception writing data %s, %s, %s", Integer.valueOf(i), this.connectionInfo, ExceptionUtils.getSimpleMessage(e)), e);
            }
        } catch (IOException e2) {
            throw new RssNetworkException(String.format("writeRowGroup: hit exception writing heading bytes %s, %s, %s", Integer.valueOf(i), this.connectionInfo, ExceptionUtils.getSimpleMessage(e2)), e2);
        }
    }

    public void finishUpload(long j) {
        Stopwatch start = this.metrics.getFinishUploadLatency().start();
        try {
            try {
                byte b = this.finishUploadAck ? (byte) 1 : (byte) 0;
                FinishUploadMessage finishUploadMessage = new FinishUploadMessage(j, System.currentTimeMillis(), b);
                if (b == 0) {
                    writeControlMessageNotWaitResponseStatus(finishUploadMessage);
                } else {
                    writeControlMessageAndWaitResponseStatus(finishUploadMessage);
                }
            } catch (Throwable th) {
                throw new RssFinishUploadException(String.format("Failed to finish upload to server %s, %s, %s. If the network is good, this error may indicate your shuffle data exceeds the server side limit. This shuffle client has written %s bytes.", Long.valueOf(j), this.connectionInfo, ExceptionUtils.getSimpleMessage(th), Long.valueOf(getShuffleWriteBytes())), th);
            }
        } finally {
            start.stop();
        }
    }

    @Override // com.uber.rss.clients.ClientBase, java.lang.AutoCloseable
    public void close() {
        super.close();
        closeMetrics();
    }

    public long getShuffleWriteBytes() {
        return this.totalWriteBytes - this.startUploadShuffleByteSnapshot;
    }

    private void closeMetrics() {
        try {
            if (this.metrics != null) {
                this.metrics.close();
                this.metrics = null;
            }
        } catch (Throwable th) {
            M3Stats.addException(th, getClass().getSimpleName());
            logger.warn(String.format("Failed to close metrics: %s", this.connectionInfo), th);
        }
    }
}
