package com.uber.rss.clients;

import com.uber.rss.common.AppTaskAttemptId;
import com.uber.rss.common.ServerReplicationGroup;
import com.uber.rss.exceptions.RssAggregateException;
import com.uber.rss.exceptions.RssException;
import com.uber.rss.exceptions.RssInvalidDataException;
import com.uber.rss.exceptions.RssInvalidStateException;
import com.uber.rss.exceptions.RssQueueNotReadyException;
import com.uber.rss.metrics.M3Stats;
import com.uber.rss.metrics.WriteClientMetrics;
import com.uber.rss.metrics.WriteClientMetricsKey;
import com.uber.rss.util.ExceptionUtils;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rss_shaded.com.uber.m3.tally.Stopwatch;

/* loaded from: input_file:com/uber/rss/clients/MultiServerAsyncWriteClient.class */
public class MultiServerAsyncWriteClient implements MultiServerWriteClient {
    private static final Logger logger = LoggerFactory.getLogger(MultiServerAsyncWriteClient.class);
    private final List<ServerConnectionInfo> servers;
    private final int networkTimeoutMillis;
    private final long maxTryingMillis;
    private final boolean finishUploadAck;
    private final boolean usePooledConnection;
    private final String user;
    private final String appId;
    private final String appAttempt;
    private final ShuffleWriteConfig shuffleWriteConfig;
    private final ReplicatedWriteClient[] clients;
    private final BlockingQueue<Record>[] recordQueues;
    private final Thread[] threads;
    private long lastLogTime;
    private final long logInterval = 30000;
    private final AtomicLong queueInsertTime;
    private final AtomicLong queuePollTime;
    private final AtomicLong socketTime;
    private final CopyOnWriteArrayList<Throwable> exceptions;
    private boolean threadStarted;
    private AppTaskAttemptId currentAppTaskAttemptId;
    private final WriteClientMetrics metrics;
    private final int partitionFanout;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/uber/rss/clients/MultiServerAsyncWriteClient$Record.class */
    public static class Record {
        private boolean isStopMarker;
        private int partition;
        private ByteBuffer value;
        private int clientIndex;

        public Record(int i, ByteBuffer byteBuffer, int i2) {
            this.isStopMarker = false;
            this.partition = i;
            this.value = byteBuffer;
            this.clientIndex = i2;
        }

        public Record(boolean z) {
            this.isStopMarker = false;
            this.isStopMarker = z;
        }

        public String toString() {
            return "Record{partition=" + this.partition + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/uber/rss/clients/MultiServerAsyncWriteClient$ServerConnectionInfo.class */
    public static class ServerConnectionInfo {
        private int index;
        private ServerReplicationGroup server;

        public ServerConnectionInfo(int i, ServerReplicationGroup serverReplicationGroup) {
            this.index = i;
            this.server = serverReplicationGroup;
        }

        public String toString() {
            return "ServerConnectionInfo{index=" + this.index + ", server=" + this.server + '}';
        }
    }

    public MultiServerAsyncWriteClient(Collection<ServerReplicationGroup> collection, int i, long j, boolean z, boolean z2, int i2, int i3, String str, String str2, String str3, ShuffleWriteConfig shuffleWriteConfig) {
        this(collection, 1, i, j, z, z2, i2, i3, str, str2, str3, shuffleWriteConfig);
    }

    public MultiServerAsyncWriteClient(Collection<ServerReplicationGroup> collection, int i, int i2, long j, boolean z, boolean z2, int i3, int i4, String str, String str2, String str3, ShuffleWriteConfig shuffleWriteConfig) {
        this.servers = new ArrayList();
        this.lastLogTime = System.currentTimeMillis();
        this.logInterval = 30000L;
        this.queueInsertTime = new AtomicLong();
        this.queuePollTime = new AtomicLong();
        this.socketTime = new AtomicLong();
        this.exceptions = new CopyOnWriteArrayList<>();
        this.threadStarted = false;
        Iterator<ServerReplicationGroup> it = collection.iterator();
        while (it.hasNext()) {
            this.servers.add(new ServerConnectionInfo(this.servers.size(), it.next()));
        }
        this.partitionFanout = i;
        this.networkTimeoutMillis = i2;
        this.maxTryingMillis = j;
        this.finishUploadAck = z;
        this.usePooledConnection = z2;
        this.user = str;
        this.appId = str2;
        this.appAttempt = str3;
        this.shuffleWriteConfig = shuffleWriteConfig;
        this.clients = new ReplicatedWriteClient[this.servers.size()];
        this.recordQueues = (BlockingQueue[]) Array.newInstance((Class<?>) ArrayBlockingQueue.class, i4);
        for (int i5 = 0; i5 < i4; i5++) {
            this.recordQueues[i5] = new ArrayBlockingQueue(i3);
        }
        this.threads = new Thread[i4];
        this.metrics = new WriteClientMetrics(new WriteClientMetricsKey(getClass().getSimpleName(), str));
        this.metrics.getNumClients().inc(1L);
        if (i > this.servers.size()) {
            throw new RssInvalidDataException(String.format("Too many servers (%s) per partition, larger than max number of servers (%s)", Integer.valueOf(i), Integer.valueOf(this.servers.size())));
        }
        logger.info(String.format("Created %s, threads: %s, queue size: %s", getClass().getSimpleName(), Integer.valueOf(i4), Integer.valueOf(i3)));
    }

    @Override // com.uber.rss.clients.MultiServerWriteClient
    public void connect() {
        this.servers.parallelStream().forEach(serverConnectionInfo -> {
            connectSingleClient(serverConnectionInfo);
        });
        synchronized (this.clients) {
            for (int i = 0; i < this.clients.length; i++) {
                if (this.clients[i] == null) {
                    throw new RssInvalidStateException(String.format("Client %s is null", Integer.valueOf(i)));
                }
            }
        }
        for (int i2 = 0; i2 < this.threads.length; i2++) {
            int i3 = i2;
            Thread thread = new Thread(() -> {
                logger.info(String.format("Record Thread %s started", Integer.valueOf(i3)));
                BlockingQueue<Record> blockingQueue = this.recordQueues[i3];
                try {
                    long j = this.networkTimeoutMillis * 4;
                    while (this.exceptions.isEmpty()) {
                        long nanoTime = System.nanoTime();
                        Record poll = blockingQueue.poll(j, TimeUnit.MILLISECONDS);
                        this.queuePollTime.addAndGet(System.nanoTime() - nanoTime);
                        if (poll == null) {
                            logger.info("Record queue {} has no record after waiting {} millis", Integer.valueOf(i3), Long.valueOf(j));
                        } else {
                            if (poll.isStopMarker) {
                                break;
                            }
                            ReplicatedWriteClient replicatedWriteClient = this.clients[poll.clientIndex];
                            long nanoTime2 = System.nanoTime();
                            replicatedWriteClient.writeDataBlock(poll.partition, poll.value);
                            this.socketTime.addAndGet(System.nanoTime() - nanoTime2);
                        }
                    }
                } catch (Throwable th) {
                    logger.warn(String.format("Record Thread %s got exception, %s", Integer.valueOf(i3), ExceptionUtils.getSimpleMessage(th)), th);
                    M3Stats.addException(th, getClass().getSimpleName());
                    this.exceptions.add(th);
                }
                int size = blockingQueue.size();
                if (size > 0) {
                    this.exceptions.add(new RssQueueNotReadyException(String.format("Record queue %s has %s remaining records not sent out", Integer.valueOf(i3), Integer.valueOf(size))));
                }
                blockingQueue.clear();
                logger.info(String.format("Record Thread %s finished, remaining records: %s", Integer.valueOf(i3), Integer.valueOf(size)));
            });
            thread.setName("Record Thread " + i2);
            this.threads[i3] = thread;
        }
    }

    @Override // com.uber.rss.clients.ShuffleDataWriter
    public void startUpload(AppTaskAttemptId appTaskAttemptId, int i, int i2) {
        this.currentAppTaskAttemptId = appTaskAttemptId;
        Arrays.stream(this.clients).forEach(replicatedWriteClient -> {
            replicatedWriteClient.startUpload(appTaskAttemptId, i, i2);
        });
    }

    @Override // com.uber.rss.clients.ShuffleDataWriter
    public void writeDataBlock(int i, ByteBuffer byteBuffer) {
        if (!this.threadStarted) {
            for (Thread thread : this.threads) {
                thread.start();
            }
            this.threadStarted = true;
        }
        if (!this.exceptions.isEmpty()) {
            throw new RssAggregateException(this.exceptions);
        }
        int length = i % this.clients.length;
        if (this.partitionFanout > 1) {
            length = (length + ((int) (this.currentAppTaskAttemptId.getTaskAttemptId() % this.partitionFanout))) % this.clients.length;
        }
        BlockingQueue<Record> blockingQueue = this.recordQueues[length % this.threads.length];
        try {
            long nanoTime = System.nanoTime();
            boolean offer = blockingQueue.offer(createUploadRecord(i, byteBuffer, length), this.networkTimeoutMillis, TimeUnit.MILLISECONDS);
            this.queueInsertTime.addAndGet(System.nanoTime() - nanoTime);
            if (!offer) {
                throw new RssQueueNotReadyException(String.format("sendRecord: Record queue has no space available after waiting %s millis", Integer.valueOf(this.networkTimeoutMillis)));
            }
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastLogTime > 30000) {
                for (int i2 = 0; i2 < this.recordQueues.length; i2++) {
                    logger.info(String.format("Record queue %s size: %s", Integer.valueOf(i2), Integer.valueOf(this.recordQueues[i2].size())));
                }
                this.lastLogTime = currentTimeMillis;
            }
        } catch (InterruptedException e) {
            throw new RssException("Interrupted when inserting to record queue", e);
        }
    }

    @Override // com.uber.rss.clients.ShuffleDataWriter
    public void finishUpload() {
        Stopwatch start = this.metrics.getFinishUploadLatency().start();
        try {
            long nanoTime = System.nanoTime();
            stopThreads();
            waitThreadsExit();
            long nanoTime2 = System.nanoTime() - nanoTime;
            if (!this.exceptions.isEmpty()) {
                throw new RssAggregateException(this.exceptions);
            }
            long nanoTime3 = System.nanoTime();
            ((Stream) Arrays.stream(this.clients).parallel()).forEach((v0) -> {
                v0.finishUpload();
            });
            logger.info(String.format("WriteClientTime (%s), queue insert seconds: %s, queue poll seconds: %s, socket seconds: %s, stop thread seconds: %s, finish upload seconds: %s", this.currentAppTaskAttemptId, Long.valueOf(TimeUnit.NANOSECONDS.toSeconds(this.queueInsertTime.get())), Long.valueOf(TimeUnit.NANOSECONDS.toSeconds(this.queuePollTime.get())), Long.valueOf(TimeUnit.NANOSECONDS.toSeconds(this.socketTime.get())), Long.valueOf(TimeUnit.NANOSECONDS.toSeconds(nanoTime2)), Long.valueOf(TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - nanoTime3))));
            start.stop();
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    @Override // com.uber.rss.clients.ShuffleDataWriter
    public long getShuffleWriteBytes() {
        long j = 0;
        for (ReplicatedWriteClient replicatedWriteClient : this.clients) {
            if (replicatedWriteClient != null) {
                j += replicatedWriteClient.getShuffleWriteBytes();
            }
        }
        return j;
    }

    @Override // com.uber.rss.clients.ShuffleDataWriter, java.lang.AutoCloseable
    public void close() {
        try {
            stopThreads();
            closeMetrics();
            waitThreadsExit();
            if (!this.exceptions.isEmpty()) {
                throw new RssAggregateException(this.exceptions);
            }
        } finally {
            ((Stream) Arrays.stream(this.clients).parallel()).forEach(replicatedWriteClient -> {
                closeClient(replicatedWriteClient);
            });
        }
    }

    public String toString() {
        return "MultiServerAsyncWriteClient{clients=" + Arrays.toString(this.clients) + '}';
    }

    private void connectSingleClient(ServerConnectionInfo serverConnectionInfo) {
        ReplicatedWriteClient replicatedWriteClient = new ReplicatedWriteClient(serverConnectionInfo.server, this.networkTimeoutMillis, this.finishUploadAck, this.usePooledConnection, this.user, this.appId, this.appAttempt, this.shuffleWriteConfig);
        replicatedWriteClient.connect();
        synchronized (this.clients) {
            this.clients[serverConnectionInfo.index] = replicatedWriteClient;
        }
    }

    private void closeClient(ReplicatedWriteClient replicatedWriteClient) {
        if (replicatedWriteClient != null) {
            try {
                logger.debug(String.format("Closing client: %s", replicatedWriteClient));
                replicatedWriteClient.close();
            } catch (Throwable th) {
                logger.warn("Failed to close client", th);
            }
        }
    }

    private void stopThreads() {
        if (this.threadStarted) {
            for (int i = 0; i < this.threads.length; i++) {
                try {
                    if (!this.recordQueues[i].offer(createStopMarkerRecord(), this.networkTimeoutMillis, TimeUnit.MILLISECONDS)) {
                        throw new RssQueueNotReadyException(String.format("stopThreads: Record queue has no space available after waiting %s millis", Integer.valueOf(this.networkTimeoutMillis)));
                    }
                    logger.debug(String.format("Inserted stop marker to record queue %s", Integer.valueOf(i)));
                } catch (InterruptedException e) {
                    throw new RssException("Interrupted when inserting stop marker to record queue", e);
                }
            }
        }
    }

    private void waitThreadsExit() {
        if (this.threadStarted) {
            for (int i = 0; i < this.threads.length; i++) {
                try {
                    Thread thread = this.threads[i];
                    if (thread != null) {
                        thread.join(this.maxTryingMillis);
                        if (thread.isAlive()) {
                            this.exceptions.add(new RssException(String.format("Thread %s still alive after waiting %s milliseconds", Integer.valueOf(i), Long.valueOf(this.maxTryingMillis))));
                        }
                    }
                } catch (InterruptedException e) {
                    throw new RssException(String.format("Failed to wait record thread %s exit", Integer.valueOf(i)), e);
                }
            }
            this.threadStarted = false;
        }
    }

    private Record createUploadRecord(int i, ByteBuffer byteBuffer, int i2) {
        return new Record(i, byteBuffer, i2);
    }

    private Record createStopMarkerRecord() {
        return new Record(true);
    }

    private void closeMetrics() {
        try {
            this.metrics.close();
        } catch (Throwable th) {
            M3Stats.addException(th, getClass().getSimpleName());
            logger.warn(String.format("Failed to close metrics: %s", this), th);
        }
    }
}
