package org.apache.iotdb.cluster.log.catchup;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.LeaderUnknownException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.handlers.caller.LogCatchUpHandler;
import org.apache.iotdb.cluster.server.handlers.caller.LogCatchUpInBatchHandler;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.class */
public class LogCatchUpTask implements Callable<Boolean> {
    private static final long SEND_LOGS_WAIT_MS = RaftServer.getWriteOperationTimeoutMS();
    private static final Logger logger = LoggerFactory.getLogger(LogCatchUpTask.class);
    Node node;
    RaftMember raftMember;
    private List<Log> logs;
    private boolean useBatch;
    boolean abort;

    /* JADX INFO: Access modifiers changed from: package-private */
    public LogCatchUpTask(List<Log> list, Node node, RaftMember raftMember) {
        this.useBatch = ClusterDescriptor.getInstance().getConfig().isUseBatchInLogCatchUp();
        this.abort = false;
        this.logs = list;
        this.node = node;
        this.raftMember = raftMember;
    }

    LogCatchUpTask(List<Log> list, Node node, RaftMember raftMember, boolean z) {
        this.useBatch = ClusterDescriptor.getInstance().getConfig().isUseBatchInLogCatchUp();
        this.abort = false;
        this.logs = list;
        this.node = node;
        this.raftMember = raftMember;
        this.useBatch = z;
    }

    void setUseBatch(boolean z) {
        this.useBatch = z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doLogCatchUp() throws TException, InterruptedException, LeaderUnknownException {
        AppendEntryRequest appendEntryRequest = new AppendEntryRequest();
        if (this.raftMember.getHeader() != null) {
            appendEntryRequest.setHeader(this.raftMember.getHeader());
        }
        appendEntryRequest.setLeader(this.raftMember.getThisNode());
        appendEntryRequest.setLeaderCommit(this.raftMember.getLogManager().getCommitLogIndex());
        for (int i = 0; i < this.logs.size() && !this.abort; i++) {
            Log log = this.logs.get(i);
            synchronized (this.raftMember.getTerm()) {
                if (this.raftMember.getCharacter() != NodeCharacter.LEADER) {
                    throw new LeaderUnknownException(this.raftMember.getAllNodes());
                }
                appendEntryRequest.setTerm(this.raftMember.getTerm().get());
            }
            appendEntryRequest.setPrevLogIndex(log.getCurrLogIndex() - 1);
            if (i == 0) {
                try {
                    appendEntryRequest.setPrevLogTerm(this.raftMember.getLogManager().getTerm(log.getCurrLogIndex() - 1));
                } catch (Exception e) {
                    logger.error("getTerm failed for newly append entries", e);
                }
            } else {
                appendEntryRequest.setPrevLogTerm(this.logs.get(i - 1).getCurrLogTerm());
            }
            if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
                this.abort = !appendEntryAsync(log, appendEntryRequest);
            } else {
                this.abort = !appendEntrySync(log, appendEntryRequest);
            }
        }
    }

    private boolean appendEntryAsync(Log log, AppendEntryRequest appendEntryRequest) throws TException, InterruptedException {
        LogCatchUpHandler catchUpHandler = getCatchUpHandler(log, appendEntryRequest);
        synchronized (catchUpHandler.getAppendSucceed()) {
            RaftService.AsyncClient asyncClient = this.raftMember.getAsyncClient(this.node);
            if (asyncClient == null) {
                return false;
            }
            asyncClient.appendEntry(appendEntryRequest, catchUpHandler);
            this.raftMember.getLastCatchUpResponseTime().put(this.node, Long.valueOf(System.currentTimeMillis()));
            catchUpHandler.getAppendSucceed().wait(RaftServer.getWriteOperationTimeoutMS());
            return catchUpHandler.getAppendSucceed().get();
        }
    }

    private LogCatchUpHandler getCatchUpHandler(Log log, AppendEntryRequest appendEntryRequest) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        LogCatchUpHandler logCatchUpHandler = new LogCatchUpHandler();
        logCatchUpHandler.setAppendSucceed(atomicBoolean);
        logCatchUpHandler.setRaftMember(this.raftMember);
        logCatchUpHandler.setFollower(this.node);
        logCatchUpHandler.setLog(log);
        appendEntryRequest.setEntry(log.serialize());
        return logCatchUpHandler;
    }

    private boolean appendEntrySync(Log log, AppendEntryRequest appendEntryRequest) {
        LogCatchUpHandler catchUpHandler = getCatchUpHandler(log, appendEntryRequest);
        RaftService.Client syncClient = this.raftMember.getSyncClient(this.node);
        try {
            if (syncClient == null) {
                logger.error("No available client for {} when append entry", this.node);
                return false;
            }
            try {
                catchUpHandler.onComplete(Long.valueOf(syncClient.appendEntry(appendEntryRequest)));
                boolean z = catchUpHandler.getAppendSucceed().get();
                ClientUtils.putBackSyncClient(syncClient);
                return z;
            } catch (TException e) {
                syncClient.getInputProtocol().getTransport().close();
                catchUpHandler.onError(e);
                ClientUtils.putBackSyncClient(syncClient);
                return false;
            }
        } catch (Throwable th) {
            ClientUtils.putBackSyncClient(syncClient);
            throw th;
        }
    }

    private AppendEntriesRequest prepareRequest(List<ByteBuffer> list, int i) {
        AppendEntriesRequest appendEntriesRequest = new AppendEntriesRequest();
        if (this.raftMember.getHeader() != null) {
            appendEntriesRequest.setHeader(this.raftMember.getHeader());
        }
        appendEntriesRequest.setLeader(this.raftMember.getThisNode());
        appendEntriesRequest.setLeaderCommit(this.raftMember.getLogManager().getCommitLogIndex());
        synchronized (this.raftMember.getTerm()) {
            if (this.raftMember.getCharacter() != NodeCharacter.LEADER) {
                logger.debug("Leadership is lost when doing a catch-up to {}, aborting", this.node);
                this.abort = true;
                return null;
            }
            appendEntriesRequest.setTerm(this.raftMember.getTerm().get());
            appendEntriesRequest.setEntries(list);
            appendEntriesRequest.setPrevLogIndex(this.logs.get(i).getCurrLogIndex() - 1);
            if (i != 0) {
                appendEntriesRequest.setPrevLogTerm(this.logs.get(i - 1).getCurrLogTerm());
            } else {
                try {
                    appendEntriesRequest.setPrevLogTerm(this.raftMember.getLogManager().getTerm(this.logs.get(0).getCurrLogIndex() - 1));
                } catch (Exception e) {
                    logger.error("getTerm failed for newly append entries", e);
                }
            }
            logger.debug("{}, node={} catchup request={}", new Object[]{this.raftMember.getName(), this.node, appendEntriesRequest});
            return appendEntriesRequest;
        }
    }

    private void doLogCatchUpInBatch() throws TException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        long j = 0;
        int i = 0;
        for (int i2 = 0; i2 < this.logs.size() && !this.abort; i2++) {
            ByteBuffer serialize = this.logs.get(i2).serialize();
            int length = serialize.array().length;
            if (length > IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize() - 4194304) {
                logger.warn("the frame size {} of thrift is too small", Integer.valueOf(IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize()));
                this.abort = true;
                return;
            }
            j += length;
            if (j > IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize() - 4194304) {
                sendBatchLogs(arrayList, i);
                arrayList.add(serialize);
                i = i2;
                j = length;
            } else {
                arrayList.add(serialize);
            }
            if (arrayList.size() >= 100) {
                sendBatchLogs(arrayList, i);
                i = i2 + 1;
                j = 0;
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        sendBatchLogs(arrayList, i);
    }

    private void sendBatchLogs(List<ByteBuffer> list, int i) throws TException, InterruptedException {
        if (logger.isInfoEnabled()) {
            logger.info("{} send logs from {} num {} for {}", new Object[]{this.raftMember.getThisNode(), Long.valueOf(this.logs.get(i).getCurrLogIndex()), Integer.valueOf(list.size()), this.node});
        }
        AppendEntriesRequest prepareRequest = prepareRequest(list, i);
        if (prepareRequest == null) {
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("{}: sending {} logs to {}", new Object[]{this.raftMember.getName(), this.node, Integer.valueOf(list.size())});
        }
        if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
            this.abort = !appendEntriesAsync(list, prepareRequest);
        } else {
            this.abort = !appendEntriesSync(list, prepareRequest);
        }
        if (!this.abort && logger.isInfoEnabled()) {
            logger.info("{}: sent {} logs to {}", new Object[]{this.raftMember.getName(), this.node, Integer.valueOf(list.size())});
        }
        list.clear();
    }

    private boolean appendEntriesAsync(List<ByteBuffer> list, AppendEntriesRequest appendEntriesRequest) throws TException, InterruptedException {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        LogCatchUpInBatchHandler logCatchUpInBatchHandler = new LogCatchUpInBatchHandler();
        logCatchUpInBatchHandler.setAppendSucceed(atomicBoolean);
        logCatchUpInBatchHandler.setRaftMember(this.raftMember);
        logCatchUpInBatchHandler.setFollower(this.node);
        logCatchUpInBatchHandler.setLogs(list);
        synchronized (atomicBoolean) {
            atomicBoolean.set(false);
            RaftService.AsyncClient asyncClient = this.raftMember.getAsyncClient(this.node);
            if (asyncClient == null) {
                return false;
            }
            asyncClient.appendEntries(appendEntriesRequest, logCatchUpInBatchHandler);
            this.raftMember.getLastCatchUpResponseTime().put(this.node, Long.valueOf(System.currentTimeMillis()));
            atomicBoolean.wait(SEND_LOGS_WAIT_MS);
            return atomicBoolean.get();
        }
    }

    private boolean appendEntriesSync(List<ByteBuffer> list, AppendEntriesRequest appendEntriesRequest) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        LogCatchUpInBatchHandler logCatchUpInBatchHandler = new LogCatchUpInBatchHandler();
        logCatchUpInBatchHandler.setAppendSucceed(atomicBoolean);
        logCatchUpInBatchHandler.setRaftMember(this.raftMember);
        logCatchUpInBatchHandler.setFollower(this.node);
        logCatchUpInBatchHandler.setLogs(list);
        RaftService.Client syncClient = this.raftMember.getSyncClient(this.node);
        try {
            if (syncClient == null) {
                logger.error("No available client for {} when append entries", this.node);
                return false;
            }
            try {
                logCatchUpInBatchHandler.onComplete(Long.valueOf(syncClient.appendEntries(appendEntriesRequest)));
                boolean z = atomicBoolean.get();
                ClientUtils.putBackSyncClient(syncClient);
                return z;
            } catch (TException e) {
                syncClient.getInputProtocol().getTransport().close();
                logCatchUpInBatchHandler.onError(e);
                logger.warn("Failed logs: {}, first index: {}", list, Long.valueOf(appendEntriesRequest.prevLogIndex + 1));
                ClientUtils.putBackSyncClient(syncClient);
                return false;
            }
        } catch (Throwable th) {
            ClientUtils.putBackSyncClient(syncClient);
            throw th;
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Boolean call() throws TException, InterruptedException, LeaderUnknownException {
        if (this.logs.isEmpty()) {
            return true;
        }
        if (this.useBatch) {
            doLogCatchUpInBatch();
        } else {
            doLogCatchUp();
        }
        Logger logger2 = logger;
        Object[] objArr = new Object[3];
        objArr[0] = this.raftMember.getName();
        objArr[1] = this.node;
        objArr[2] = Boolean.valueOf(!this.abort);
        logger2.info("{}: Catch up {} finished with result {}", objArr);
        this.raftMember.getLastCatchUpResponseTime().remove(this.node);
        return Boolean.valueOf(!this.abort);
    }
}
