package com.uber.rss.clients;

import com.google.common.collect.Maps;
import com.uber.rss.common.AppShufflePartitionId;
import com.uber.rss.common.ServerDetail;
import com.uber.rss.common.ServerReplicationGroup;
import com.uber.rss.exceptions.RssAggregateException;
import com.uber.rss.exceptions.RssException;
import com.uber.rss.exceptions.RssInconsistentReplicaException;
import com.uber.rss.exceptions.RssInvalidStateException;
import com.uber.rss.exceptions.RssNoActiveReadClientException;
import com.uber.rss.exceptions.RssNonRecoverableException;
import com.uber.rss.metrics.M3Stats;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/uber/rss/clients/ReplicatedReadClient.class */
public class ReplicatedReadClient implements MultiServerReadClient {
    private static final Logger logger = LoggerFactory.getLogger(ReplicatedReadClient.class);
    private final ServerReplicationGroup serverReplicationGroup;
    private final int timeoutMillis;
    private final ClientRetryOptions clientRetryOptions;
    private final String user;
    private final AppShufflePartitionId appShufflePartitionId;
    private final Collection<Long> fetchTaskAttemptIds;
    private final long dataAvailablePollInterval;
    private final long dataAvailableWaitTime;
    private final RetriableSocketReadClient[] clients;
    private final boolean[] clientsInitialized;
    private final Map<Long, Long> numConsumedRecordsMap;
    private final Map<Long, TaskDataBlock> lastConsumedRecordsMap;
    private final Map<Long, Long> numReadRecordsMap;
    private final Map<Long, TaskDataBlock> lastReadRecordsMap;
    private final boolean checkDataConsistency;
    private int currentClientIndex;
    private boolean endOfRead;
    private long shuffleReadBytes;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/uber/rss/clients/ReplicatedReadClient$ExceptionLogInfo.class */
    public static class ExceptionLogInfo {
        private String logMsg;
        private Throwable exception;

        public ExceptionLogInfo(String str, Throwable th) {
            this.logMsg = str;
            this.exception = th;
        }
    }

    public ReplicatedReadClient(ServerReplicationGroup serverReplicationGroup, int i, String str, AppShufflePartitionId appShufflePartitionId, ReadClientDataOptions readClientDataOptions) {
        this(serverReplicationGroup, i, new ClientRetryOptions(readClientDataOptions.getDataAvailablePollInterval(), i), str, appShufflePartitionId, readClientDataOptions, true);
    }

    public ReplicatedReadClient(ServerReplicationGroup serverReplicationGroup, int i, String str, AppShufflePartitionId appShufflePartitionId, ReadClientDataOptions readClientDataOptions, boolean z) {
        this(serverReplicationGroup, i, new ClientRetryOptions(readClientDataOptions.getDataAvailablePollInterval(), i), str, appShufflePartitionId, readClientDataOptions, z);
    }

    public ReplicatedReadClient(ServerReplicationGroup serverReplicationGroup, int i, ClientRetryOptions clientRetryOptions, String str, AppShufflePartitionId appShufflePartitionId, ReadClientDataOptions readClientDataOptions, boolean z) {
        this.numConsumedRecordsMap = new HashMap();
        this.lastConsumedRecordsMap = new HashMap();
        this.numReadRecordsMap = new HashMap();
        this.lastReadRecordsMap = new HashMap();
        this.currentClientIndex = 0;
        this.endOfRead = false;
        this.shuffleReadBytes = -1L;
        this.serverReplicationGroup = serverReplicationGroup;
        this.timeoutMillis = i;
        this.clientRetryOptions = clientRetryOptions;
        this.user = str;
        this.appShufflePartitionId = appShufflePartitionId;
        this.fetchTaskAttemptIds = readClientDataOptions.getFetchTaskAttemptIds();
        this.dataAvailablePollInterval = readClientDataOptions.getDataAvailablePollInterval();
        this.dataAvailableWaitTime = readClientDataOptions.getDataAvailableWaitTime();
        this.checkDataConsistency = z;
        List<ServerDetail> servers = serverReplicationGroup.getServers();
        if (servers.isEmpty()) {
            throw new RssException("No server in replication group");
        }
        this.clients = new RetriableSocketReadClient[servers.size()];
        this.clientsInitialized = new boolean[this.clients.length];
        resetClientInstances();
    }

    @Override // com.uber.rss.clients.MultiServerReadClient
    public synchronized void connect() {
        long j;
        long currentTimeMillis = System.currentTimeMillis();
        long size = this.timeoutMillis * this.serverReplicationGroup.getServers().size();
        long j2 = this.dataAvailablePollInterval;
        while (currentTimeMillis - currentTimeMillis <= size) {
            try {
                connectAndInitializeClient();
                return;
            } finally {
                if ((j > size ? 1 : (j == size ? 0 : -1)) > 0) {
                }
            }
        }
    }

    @Override // com.uber.rss.clients.ShuffleDataReader, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.currentClientIndex < this.clients.length) {
            if (this.clientsInitialized[this.currentClientIndex]) {
                this.shuffleReadBytes = this.clients[this.currentClientIndex].getShuffleReadBytes();
            }
            this.clients[this.currentClientIndex].close();
            this.clientsInitialized[this.currentClientIndex] = false;
        }
        this.numConsumedRecordsMap.clear();
        this.numReadRecordsMap.clear();
        this.lastConsumedRecordsMap.clear();
        this.lastReadRecordsMap.clear();
    }

    @Override // com.uber.rss.clients.ShuffleDataReader
    public synchronized TaskDataBlock readDataBlock() {
        if (this.endOfRead) {
            return null;
        }
        getActiveClient();
        while (this.currentClientIndex < this.clients.length) {
            if (this.endOfRead) {
                return null;
            }
            try {
                try {
                    if (!this.clientsInitialized[this.currentClientIndex]) {
                        connectAndInitializeClient();
                    }
                    TaskDataBlock readDataBlock = this.clients[this.currentClientIndex].readDataBlock();
                    if (this.clients.length == 1) {
                        return readDataBlock;
                    }
                    while (readDataBlock != null) {
                        if (!shouldSkipReadRecord(readDataBlock)) {
                            rememberLastConsumedRecord(readDataBlock);
                            rememberLastReadRecord(readDataBlock);
                            return readDataBlock;
                        }
                        rememberLastReadRecord(readDataBlock);
                        readDataBlock = this.clients[this.currentClientIndex].readDataBlock();
                    }
                    checkRecordDataConsistency();
                    this.endOfRead = true;
                    return null;
                } catch (RssInconsistentReplicaException | RssNonRecoverableException e) {
                    M3Stats.addException(e, getClass().getSimpleName());
                    closeClient(this.currentClientIndex);
                    throw e;
                }
            } catch (Throwable th) {
                M3Stats.addException(th, getClass().getSimpleName());
                closeClient(this.currentClientIndex);
                boolean z = this.currentClientIndex < this.clients.length - 1;
                if (0 == 0 || !z) {
                    if (z) {
                        throw new RssNonRecoverableException("Failed to read records from server replication group: " + this.serverReplicationGroup, th);
                    }
                    throw th;
                }
                logger.warn(String.format("Failed to read after reading %s records in client (current index: %s): %s. Will try next client in the replication group", Long.valueOf(this.numReadRecordsMap.values().stream().mapToLong(l -> {
                    return l.longValue();
                }).sum()), Integer.valueOf(this.currentClientIndex), this.clients[this.currentClientIndex]), th);
                this.currentClientIndex++;
            }
        }
        throw new RssInvalidStateException("Should not execute here!");
    }

    @Override // com.uber.rss.clients.ShuffleDataReader
    public synchronized long getShuffleReadBytes() {
        return this.shuffleReadBytes >= 0 ? this.shuffleReadBytes : getActiveClient().getShuffleReadBytes();
    }

    public String toString() {
        return "ReplicatedReadClient{clients=" + Arrays.toString(this.clients) + '}';
    }

    private void resetClientInstances() {
        List<ServerDetail> servers = this.serverReplicationGroup.getServers();
        for (int i = 0; i < servers.size(); i++) {
            this.clients[i] = new RetriableSocketReadClient(servers.get(i), this.timeoutMillis, this.clientRetryOptions, this.user, this.appShufflePartitionId, new ReadClientDataOptions(this.fetchTaskAttemptIds, this.dataAvailablePollInterval, this.dataAvailableWaitTime));
            this.clientsInitialized[i] = false;
        }
    }

    private void connectAndInitializeClient() {
        int i;
        int i2;
        RssAggregateException rssAggregateException;
        ArrayList arrayList = null;
        boolean z = false;
        while (this.currentClientIndex < this.clients.length) {
            try {
                logger.info(String.format("Trying to connect to server: %s", this.clients[this.currentClientIndex]));
                this.clients[this.currentClientIndex].connect();
                this.clientsInitialized[this.currentClientIndex] = true;
                resetReadRecords();
                z = true;
                break;
            } finally {
                if (i >= i2) {
                }
            }
        }
        if (z) {
            if (arrayList != null) {
                arrayList.forEach(exceptionLogInfo -> {
                    logger.warn(exceptionLogInfo.logMsg, exceptionLogInfo.exception);
                });
            }
        } else {
            if (arrayList != null && !arrayList.isEmpty()) {
                throw new RssAggregateException((Collection) arrayList.stream().map(exceptionLogInfo2 -> {
                    return exceptionLogInfo2.exception;
                }).collect(Collectors.toList()));
            }
            throw new RssInvalidStateException("Invalid read client state: failed to initialized, but no exceptions");
        }
    }

    private void rememberLastConsumedRecord(TaskDataBlock taskDataBlock) {
        increaseRecordCount(this.numConsumedRecordsMap, taskDataBlock.getTaskAttemptId());
        if (this.checkDataConsistency) {
            this.lastConsumedRecordsMap.put(Long.valueOf(taskDataBlock.getTaskAttemptId()), taskDataBlock);
        }
    }

    private void rememberLastReadRecord(TaskDataBlock taskDataBlock) {
        increaseRecordCount(this.numReadRecordsMap, taskDataBlock.getTaskAttemptId());
        if (this.checkDataConsistency) {
            this.lastReadRecordsMap.put(Long.valueOf(taskDataBlock.getTaskAttemptId()), taskDataBlock);
        }
    }

    private void resetReadRecords() {
        this.numReadRecordsMap.clear();
        this.lastReadRecordsMap.clear();
    }

    private void increaseRecordCount(Map<Long, Long> map, long j) {
        try {
            map.put(Long.valueOf(j), Long.valueOf(map.getOrDefault(Long.valueOf(j), 0L).longValue() + 1));
        } catch (Throwable th) {
            throw new RssNonRecoverableException(String.format("Failed to increase number of read records for task attempt %s, %s", Long.valueOf(j), this), th);
        }
    }

    private boolean shouldSkipReadRecord(TaskDataBlock taskDataBlock) {
        long taskAttemptId = taskDataBlock.getTaskAttemptId();
        Long l = this.numConsumedRecordsMap.get(Long.valueOf(taskAttemptId));
        if (l == null) {
            return false;
        }
        long longValue = this.numReadRecordsMap.getOrDefault(Long.valueOf(taskAttemptId), 0L).longValue();
        if (longValue < l.longValue()) {
            return true;
        }
        if (longValue != l.longValue()) {
            throw new RssInconsistentReplicaException(String.format("Inconsistent replica for task attempt %s, consumed %s records, read %s records, current client/server: %s", Long.valueOf(taskAttemptId), l, Long.valueOf(longValue), this.clients[this.currentClientIndex]));
        }
        if (!this.checkDataConsistency) {
            return false;
        }
        TaskDataBlock taskDataBlock2 = this.lastConsumedRecordsMap.get(Long.valueOf(taskAttemptId));
        TaskDataBlock taskDataBlock3 = this.lastReadRecordsMap.get(Long.valueOf(taskAttemptId));
        if (recordEquals(taskDataBlock2, taskDataBlock3)) {
            return false;
        }
        throw new RssInconsistentReplicaException(String.format("Got different records from two servers in the replication group for task attempt %s (after %s records), record from previous server: %s, record from new server: %s (%s)", Long.valueOf(taskAttemptId), Long.valueOf(longValue), taskDataBlock2, taskDataBlock3, this.clients[this.currentClientIndex]));
    }

    private void checkRecordDataConsistency() {
        if (!Maps.difference(this.numReadRecordsMap, this.numConsumedRecordsMap).areEqual()) {
            throw new RssInconsistentReplicaException(String.format("Data corrupted! Number of consumed records (returned to caller): %s. Number of records read from current server: %s ((%s))", this.numConsumedRecordsMap, this.numReadRecordsMap, this.clients[this.currentClientIndex]));
        }
        if (this.checkDataConsistency && !Maps.difference(this.lastReadRecordsMap, this.lastConsumedRecordsMap).areEqual()) {
            throw new RssInconsistentReplicaException(String.format("Data corrupted! Last consumed records (returned to caller): %s. Last records read from current server: %s ((%s))", this.lastConsumedRecordsMap, this.lastReadRecordsMap, this.clients[this.currentClientIndex]));
        }
    }

    private RetriableSocketReadClient getActiveClient() {
        if (this.currentClientIndex > this.clients.length - 1) {
            throw new RssNoActiveReadClientException("No active read client for server replication group: " + this.serverReplicationGroup);
        }
        return this.clients[this.currentClientIndex];
    }

    private void closeClient(int i) {
        closeClient(this.clients[i]);
        this.clientsInitialized[i] = false;
        resetReadRecords();
    }

    private void closeClient(ShuffleDataReader shuffleDataReader) {
        try {
            shuffleDataReader.close();
        } catch (Throwable th) {
            logger.warn("Failed to close read client: " + shuffleDataReader, th);
        }
    }

    private boolean recordEquals(TaskDataBlock taskDataBlock, TaskDataBlock taskDataBlock2) {
        return Objects.equals(taskDataBlock, taskDataBlock2);
    }
}
