package com.uber.rss.clients;

import com.uber.rss.exceptions.RssFileCorruptedException;
import com.uber.rss.exceptions.RssInvalidDataException;
import com.uber.rss.exceptions.RssNetworkException;
import com.uber.rss.exceptions.RssServerBusyException;
import com.uber.rss.exceptions.RssShuffleStageNotStartedException;
import com.uber.rss.exceptions.RssStaleTaskAttemptException;
import com.uber.rss.exceptions.RssTooMuchDataException;
import com.uber.rss.messages.BaseMessage;
import com.uber.rss.messages.MessageConstants;
import com.uber.rss.metrics.ClientConnectMetrics;
import com.uber.rss.metrics.ClientConnectMetricsKey;
import com.uber.rss.metrics.M3Stats;
import com.uber.rss.metrics.MetricGroupContainer;
import com.uber.rss.util.ByteBufUtils;
import com.uber.rss.util.ExceptionUtils;
import com.uber.rss.util.NetworkUtils;
import com.uber.rss.util.SocketUtils;
import com.uber.rss.util.ThreadUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.NoRouteToHostException;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rss_shaded.com.uber.m3.tally.Stopwatch;

/* loaded from: input_file:com/uber/rss/clients/ClientBase.class */
public abstract class ClientBase implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(ClientBase.class);
    private static final AtomicLong internalClientIdSeed = new AtomicLong(0);
    private static final MetricGroupContainer<ClientConnectMetricsKey, ClientConnectMetrics> metricGroupContainer = new MetricGroupContainer<>(clientConnectMetricsKey -> {
        return new ClientConnectMetrics(new ClientConnectMetricsKey(clientConnectMetricsKey.getSource(), clientConnectMetricsKey.getRemote()));
    });
    protected final String host;
    protected final int port;
    protected final int timeoutMillis;
    protected Socket socket;
    protected InputStream inputStream;
    protected OutputStream outputStream;
    protected String connectionInfo;
    private final long internalClientId = internalClientIdSeed.getAndIncrement();

    public ClientBase(String str, int i, int i2) {
        this.connectionInfo = "";
        this.host = str;
        this.port = i;
        this.timeoutMillis = i2;
        this.connectionInfo = String.format("%s %s [%s -> %s:%s]", getClass().getSimpleName(), Long.valueOf(this.internalClientId), NetworkUtils.getLocalHostName(), str, Integer.valueOf(i));
        logger.debug(String.format("Created instance (timeout: %s millis): %s", Integer.valueOf(i2), this));
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.socket == null) {
            return;
        }
        try {
            if (this.outputStream != null) {
                this.outputStream.flush();
            }
        } catch (Throwable th) {
            logger.warn("Hit exception when flushing output stream: " + this.connectionInfo, th);
        }
        try {
            if (this.outputStream != null) {
                this.outputStream.close();
            }
        } catch (Throwable th2) {
            logger.warn("Hit exception when closing output stream: " + this.connectionInfo, th2);
        }
        try {
            if (this.inputStream != null) {
                this.inputStream.close();
            }
        } catch (Throwable th3) {
            logger.warn("Hit exception when closing input stream: " + this.connectionInfo, th3);
        }
        try {
            this.socket.close();
        } catch (Throwable th4) {
            logger.warn("Hit exception when closing socket: " + this.connectionInfo, th4);
        }
        try {
            metricGroupContainer.removeMetricGroup(getClientConnectMetricsKey());
        } catch (Throwable th5) {
            logger.warn("Hit exception when removing metrics: " + this.connectionInfo, th5);
        }
        this.socket = null;
    }

    public String toString() {
        return this.connectionInfo;
    }

    protected boolean isClosed() {
        return this.socket == null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Finally extract failed */
    public void connectSocket() {
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        Throwable th = null;
        while (true) {
            try {
                if (System.currentTimeMillis() - currentTimeMillis > this.timeoutMillis) {
                    break;
                }
                ClientConnectMetrics metricGroup = metricGroupContainer.getMetricGroup(getClientConnectMetricsKey());
                if (i >= 1) {
                    logger.info(String.format("Retrying connect to %s:%s, total retrying times: %s, elapsed milliseconds: %s", this.host, Integer.valueOf(this.port), Integer.valueOf(i), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)));
                    metricGroup.getSocketConnectRetries().update(i);
                }
                i++;
                Stopwatch start = metricGroup.getSocketConnectLatency().start();
                try {
                    this.socket = new Socket();
                    this.socket.setSoTimeout(this.timeoutMillis);
                    this.socket.setTcpNoDelay(true);
                    this.socket.connect(new InetSocketAddress(this.host, this.port), this.timeoutMillis);
                    start.stop();
                    break;
                } catch (ConnectException | NoRouteToHostException | UnknownHostException e) {
                    try {
                        if ((e instanceof ConnectException) && !ExceptionUtils.isTimeoutException(e)) {
                            throw e;
                        }
                        M3Stats.addException(e, getClass().getSimpleName());
                        this.socket = null;
                        th = e;
                        logger.info(String.format("Failed to connect to %s:%s, %s", this.host, Integer.valueOf(this.port), ExceptionUtils.getSimpleMessage(e)));
                        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                        if (currentTimeMillis2 < this.timeoutMillis) {
                            ThreadUtils.sleep(Math.min(this.timeoutMillis - currentTimeMillis2, 500L));
                        }
                        start.stop();
                    } catch (Throwable th2) {
                        start.stop();
                        throw th2;
                    }
                }
            } catch (Throwable th3) {
                M3Stats.addException(th3, getClass().getSimpleName());
                String format = String.format("connectSocket failed after trying %s times for %s milliseconds (timeout %s): %s, %s", Integer.valueOf(i), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(this.timeoutMillis), this.connectionInfo, ExceptionUtils.getSimpleMessage(th3));
                logger.warn(format, th3);
                throw new RssNetworkException(format, th3);
            }
        }
        if (this.socket == null) {
            if (th == null) {
                throw new IOException(String.format("Failed to connect to %s:%s", this.host, Integer.valueOf(this.port)));
            }
            throw th;
        }
        this.inputStream = this.socket.getInputStream();
        this.outputStream = this.socket.getOutputStream();
        this.connectionInfo = String.format("%s %s [%s -> %s (%s)]", getClass().getSimpleName(), Long.valueOf(this.internalClientId), this.socket.getLocalSocketAddress(), this.socket.getRemoteSocketAddress(), this.host);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void write(byte b) {
        try {
            this.outputStream.write(b);
        } catch (IOException e) {
            String format = String.format("write failed: %s, %s", this.connectionInfo, ExceptionUtils.getSimpleMessage(e));
            logger.warn(format, e);
            throw new RssNetworkException(format, e);
        }
    }

    protected void writeMessageLengthAndContent(BaseMessage baseMessage) {
        ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(1000);
        try {
            baseMessage.serialize(buffer);
            byte[] readBytes = ByteBufUtils.readBytes(buffer);
            buffer.release();
            try {
                this.outputStream.write(ByteBufUtils.convertIntToBytes(readBytes.length));
                this.outputStream.write(readBytes);
                this.outputStream.flush();
            } catch (IOException e) {
                String format = String.format("writeMessageLengthAndContent failed: %s, %s", this.connectionInfo, ExceptionUtils.getSimpleMessage(e));
                logger.warn(format, e);
                throw new RssNetworkException(format, e);
            }
        } catch (Throwable th) {
            buffer.release();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeControlMessageNotWaitResponseStatus(BaseMessage baseMessage) {
        logger.debug(String.format("Writing control message: %s, connection: %s", baseMessage, this.connectionInfo));
        try {
            this.outputStream.write(ByteBufUtils.convertIntToBytes(baseMessage.getMessageType()));
            writeMessageLengthAndContent(baseMessage);
        } catch (IOException e) {
            String format = String.format("write message type failed: %s, %s", this.connectionInfo, ExceptionUtils.getSimpleMessage(e));
            logger.warn(format, e);
            throw new RssNetworkException(format, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeControlMessageAndWaitResponseStatus(BaseMessage baseMessage) {
        writeControlMessageNotWaitResponseStatus(baseMessage);
        readResponseStatus();
        logger.debug(String.format("Got OK response for control message: %s, connection: %s", baseMessage, this.connectionInfo));
    }

    private int readStatus() {
        try {
            return this.inputStream.read();
        } catch (IOException e) {
            String format = String.format("read status failed: %s, %s", this.connectionInfo, ExceptionUtils.getSimpleMessage(e));
            logger.warn(format, e);
            throw new RssNetworkException(format, e);
        }
    }

    protected void readHeaderResponseStatus() {
        checkHeaderResponseStatus(readStatus());
    }

    protected void readResponseStatus() {
        checkOKResponseStatus(readStatus());
    }

    private final void checkHeaderResponseStatus(int i) {
        if (i == 53) {
            throw new RssServerBusyException(String.format("Server busy: %s", this.connectionInfo));
        }
        checkOKResponseStatus(i);
    }

    private final void checkOKResponseStatus(int i) {
        switch (i) {
            case MessageConstants.RESPONSE_STATUS_OK /* 20 */:
                return;
            case MessageConstants.RESPONSE_STATUS_SHUFFLE_STAGE_NOT_STARTED /* 44 */:
                throw new RssShuffleStageNotStartedException(String.format("Shuffle not started: %s", this.connectionInfo));
            case MessageConstants.RESPONSE_STATUS_FILE_CORRUPTED /* 45 */:
                throw new RssFileCorruptedException(String.format("Shuffle file corrupted or application writing too much data: %s", this.connectionInfo));
            case MessageConstants.RESPONSE_STATUS_SERVER_BUSY /* 53 */:
                throw new RssServerBusyException(String.format("Server busy: %s", this.connectionInfo));
            case MessageConstants.RESPONSE_STATUS_APP_TOO_MUCH_DATA /* 54 */:
                throw new RssTooMuchDataException(String.format("App writing too much data: %s", this.connectionInfo));
            case MessageConstants.RESPONSE_STATUS_STALE_TASK_ATTEMPT /* 55 */:
                throw new RssStaleTaskAttemptException(String.format("Task attempt is stale (there is a new task retry, thus the old task is not valid any more)", Integer.valueOf(i), this.connectionInfo));
            default:
                throw new RssNetworkException(String.format("Response not ok: %s, %s", Integer.valueOf(i), this.connectionInfo));
        }
    }

    private ClientConnectMetricsKey getClientConnectMetricsKey() {
        return new ClientConnectMetricsKey(getClass().getSimpleName(), this.host);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <R extends BaseMessage> R readResponseMessage(int i, Function<ByteBuf, R> function) {
        int readInt = SocketUtils.readInt(this.inputStream);
        if (readInt != i) {
            throw new RssInvalidDataException(String.format("Expected message id: %s, actual message id: %s", Integer.valueOf(i), Integer.valueOf(readInt)));
        }
        return (R) readMessageLengthAndContent(function);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <R extends BaseMessage> R readMessageLengthAndContent(Function<ByteBuf, R> function) {
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(SocketUtils.readBytes(this.inputStream, SocketUtils.readInt(this.inputStream)));
        try {
            R apply = function.apply(wrappedBuffer);
            wrappedBuffer.release();
            return apply;
        } catch (Throwable th) {
            wrappedBuffer.release();
            throw th;
        }
    }
}
