package com.uber.rss.clients;

import com.uber.rss.common.AppShufflePartitionId;
import com.uber.rss.common.ServerReplicationGroup;
import com.uber.rss.exceptions.ExceptionWrapper;
import com.uber.rss.exceptions.RssException;
import com.uber.rss.exceptions.RssInvalidStateException;
import com.uber.rss.metrics.M3Stats;
import com.uber.rss.util.RetryUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/uber/rss/clients/MultiServerSocketReadClient.class */
public class MultiServerSocketReadClient implements MultiServerReadClient {
    private static final Logger logger = LoggerFactory.getLogger(MultiServerSocketReadClient.class);
    private final int timeoutMillis;
    private final ClientRetryOptions clientRetryOptions;
    private final String user;
    private final AppShufflePartitionId appShufflePartitionId;
    private final ReadClientDataOptions readClientDataOptions;
    private final boolean checkShuffleReplicaConsistency;
    private final List<ServerReplicationGroup> servers;
    private int nextClientIndex;
    private long shuffleReadBytesOfFinishedClients;
    private ReplicatedReadClient currentClient;

    public MultiServerSocketReadClient(Collection<ServerReplicationGroup> collection, int i, String str, AppShufflePartitionId appShufflePartitionId, ReadClientDataOptions readClientDataOptions, boolean z) {
        this(collection, i, new ClientRetryOptions(readClientDataOptions.getDataAvailablePollInterval(), i), str, appShufflePartitionId, readClientDataOptions, z);
    }

    public MultiServerSocketReadClient(Collection<ServerReplicationGroup> collection, int i, ClientRetryOptions clientRetryOptions, String str, AppShufflePartitionId appShufflePartitionId, ReadClientDataOptions readClientDataOptions, boolean z) {
        this.nextClientIndex = 0;
        this.shuffleReadBytesOfFinishedClients = 0L;
        this.servers = new ArrayList(collection);
        this.timeoutMillis = i;
        this.clientRetryOptions = clientRetryOptions;
        this.user = str;
        this.appShufflePartitionId = appShufflePartitionId;
        this.readClientDataOptions = readClientDataOptions;
        this.checkShuffleReplicaConsistency = z;
        if (collection.isEmpty()) {
            throw new RssException("No server provided");
        }
    }

    @Override // com.uber.rss.clients.MultiServerReadClient
    public synchronized void connect() {
        connectAndInitializeClient();
    }

    @Override // com.uber.rss.clients.ShuffleDataReader, java.lang.AutoCloseable
    public synchronized void close() {
        closeClient(this.currentClient);
    }

    @Override // com.uber.rss.clients.ShuffleDataReader
    public synchronized TaskDataBlock readDataBlock() {
        TaskDataBlock readDataBlock = this.currentClient.readDataBlock();
        while (true) {
            TaskDataBlock taskDataBlock = readDataBlock;
            if (taskDataBlock != null) {
                return taskDataBlock;
            }
            this.shuffleReadBytesOfFinishedClients += this.currentClient.getShuffleReadBytes();
            this.currentClient.close();
            this.currentClient = null;
            if (this.nextClientIndex == this.servers.size()) {
                return null;
            }
            if (this.nextClientIndex >= this.servers.size()) {
                throw new RssInvalidStateException(String.format("Invalid nextClientIndex value: %s, max value: %s, %s", Integer.valueOf(this.nextClientIndex), Integer.valueOf(this.servers.size() - 1), this));
            }
            connectAndInitializeClient();
            readDataBlock = this.currentClient.readDataBlock();
        }
    }

    @Override // com.uber.rss.clients.ShuffleDataReader
    public synchronized long getShuffleReadBytes() {
        return this.currentClient == null ? this.shuffleReadBytesOfFinishedClients : this.shuffleReadBytesOfFinishedClients + this.currentClient.getShuffleReadBytes();
    }

    public String toString() {
        return "MultiServerSocketReadClient{nextClientIndex=" + this.nextClientIndex + ", servers=" + this.servers + ", currentClient=" + this.currentClient + '}';
    }

    private void connectAndInitializeClient() {
        if (this.nextClientIndex > this.servers.size()) {
            throw new RssException(String.format("Invalid operation, next client index %s, total servers %s", Integer.valueOf(this.nextClientIndex), Integer.valueOf(this.servers.size())));
        }
        ServerReplicationGroup serverReplicationGroup = this.servers.get(this.nextClientIndex);
        logger.info(String.format("Fetching data from server: %s (%s out of %s), partition: %s", serverReplicationGroup, Integer.valueOf(this.nextClientIndex + 1), Integer.valueOf(this.servers.size()), this.appShufflePartitionId));
        ExceptionWrapper exceptionWrapper = new ExceptionWrapper();
        String format = String.format("Failed to connect to server: %s, partition: %s", serverReplicationGroup, this.appShufflePartitionId);
        ReplicatedReadClient replicatedReadClient = (ReplicatedReadClient) RetryUtils.retryUntilNotNull(this.clientRetryOptions.getRetryIntervalMillis(), this.clientRetryOptions.getRetryIntervalMillis() * 10, this.clientRetryOptions.getRetryMaxMillis(), () -> {
            ReplicatedReadClient replicatedReadClient2 = null;
            try {
                replicatedReadClient2 = new ReplicatedReadClient(serverReplicationGroup, this.timeoutMillis, this.clientRetryOptions, this.user, this.appShufflePartitionId, this.readClientDataOptions, this.checkShuffleReplicaConsistency);
                replicatedReadClient2.connect();
                return replicatedReadClient2;
            } catch (Throwable th) {
                M3Stats.addException(th, getClass().getSimpleName());
                logger.warn(format, th);
                exceptionWrapper.setException(th);
                closeClient(replicatedReadClient2);
                return null;
            }
        });
        if (replicatedReadClient != null) {
            this.currentClient = replicatedReadClient;
            this.nextClientIndex++;
        } else {
            if (exceptionWrapper.getException() == null) {
                throw new RssException(format);
            }
            if (!(exceptionWrapper.getException() instanceof RuntimeException)) {
                throw new RssException(format, exceptionWrapper.getException());
            }
            throw ((RuntimeException) exceptionWrapper.getException());
        }
    }

    private void closeClient(ReplicatedReadClient replicatedReadClient) {
        if (replicatedReadClient != null) {
            try {
                replicatedReadClient.close();
            } catch (Throwable th) {
                logger.warn(String.format("Failed to close client %s", replicatedReadClient));
            }
        }
    }
}
