package org.apache.iotdb.cluster.log;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/cluster/log/LogDispatcher.class */
public class LogDispatcher {
    private RaftMember member;
    private boolean useBatchInLogCatchUp = ClusterDescriptor.getInstance().getConfig().isUseBatchInLogCatchUp();
    private List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList();
    private ExecutorService executorService;
    private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
    private static final ExecutorService serializationService = IoTDBThreadPoolFactory.newFixedThreadPoolWithDaemonThread(Runtime.getRuntime().availableProcessors(), "DispatcherEncoder");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/iotdb/cluster/log/LogDispatcher$DispatcherThread.class */
    public class DispatcherThread implements Runnable {
        private Node receiver;
        private BlockingQueue<SendLogRequest> logBlockingDeque;
        private List<SendLogRequest> currBatch = new ArrayList();
        private Peer peer;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/iotdb/cluster/log/LogDispatcher$DispatcherThread$AppendEntriesHandler.class */
        public class AppendEntriesHandler implements AsyncMethodCallback<Long> {
            private final List<AsyncMethodCallback<Long>> singleEntryHandlers;

            private AppendEntriesHandler(List<SendLogRequest> list) {
                this.singleEntryHandlers = new ArrayList(list.size());
                for (SendLogRequest sendLogRequest : list) {
                    this.singleEntryHandlers.add(getAppendNodeEntryHandler(sendLogRequest.getLog(), sendLogRequest.getVoteCounter(), DispatcherThread.this.receiver, sendLogRequest.getLeaderShipStale(), sendLogRequest.getNewLeaderTerm(), DispatcherThread.this.peer));
                }
            }

            public void onComplete(Long l) {
                Iterator<AsyncMethodCallback<Long>> it = this.singleEntryHandlers.iterator();
                while (it.hasNext()) {
                    it.next().onComplete(l);
                }
            }

            public void onError(Exception exc) {
                Iterator<AsyncMethodCallback<Long>> it = this.singleEntryHandlers.iterator();
                while (it.hasNext()) {
                    it.next().onError(exc);
                }
            }

            private AppendNodeEntryHandler getAppendNodeEntryHandler(Log log, AtomicInteger atomicInteger, Node node, AtomicBoolean atomicBoolean, AtomicLong atomicLong, Peer peer) {
                AppendNodeEntryHandler appendNodeEntryHandler = new AppendNodeEntryHandler();
                appendNodeEntryHandler.setReceiver(node);
                appendNodeEntryHandler.setVoteCounter(atomicInteger);
                appendNodeEntryHandler.setLeaderShipStale(atomicBoolean);
                appendNodeEntryHandler.setLog(log);
                appendNodeEntryHandler.setMember(LogDispatcher.this.member);
                appendNodeEntryHandler.setPeer(peer);
                appendNodeEntryHandler.setReceiverTerm(atomicLong);
                return appendNodeEntryHandler;
            }
        }

        DispatcherThread(Node node, BlockingQueue<SendLogRequest> blockingQueue) {
            this.receiver = node;
            this.logBlockingDeque = blockingQueue;
            this.peer = LogDispatcher.this.member.getPeerMap().computeIfAbsent(node, node2 -> {
                return new Peer(LogDispatcher.this.member.getLogManager().getLastLogIndex());
            });
        }

        @Override // java.lang.Runnable
        public void run() {
            Thread.currentThread().setName("LogDispatcher-" + LogDispatcher.this.member.getName() + "-" + this.receiver);
            while (!Thread.interrupted()) {
                try {
                    this.currBatch.add(this.logBlockingDeque.take());
                    this.logBlockingDeque.drainTo(this.currBatch);
                    if (LogDispatcher.logger.isDebugEnabled()) {
                        LogDispatcher.logger.debug("Sending {} logs to {}", Integer.valueOf(this.currBatch.size()), this.receiver);
                    }
                    for (SendLogRequest sendLogRequest : this.currBatch) {
                        sendLogRequest.getAppendEntryRequest().entry = (ByteBuffer) sendLogRequest.serializedLogFuture.get();
                    }
                    sendBatchLogs(this.currBatch);
                    this.currBatch.clear();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (Exception e2) {
                    LogDispatcher.logger.error("Unexpected error in log dispatcher", e2);
                }
            }
            LogDispatcher.logger.info("Dispatcher exits");
        }

        private void appendEntriesAsync(List<ByteBuffer> list, AppendEntriesRequest appendEntriesRequest, List<SendLogRequest> list2) throws TException {
            AppendEntriesHandler appendEntriesHandler = new AppendEntriesHandler(list2);
            RaftService.AsyncClient sendLogAsyncClient = LogDispatcher.this.member.getSendLogAsyncClient(this.receiver);
            if (LogDispatcher.logger.isDebugEnabled()) {
                LogDispatcher.logger.debug("{}: append entries {} with {} logs", new Object[]{LogDispatcher.this.member.getName(), this.receiver, Integer.valueOf(list.size())});
            }
            if (sendLogAsyncClient != null) {
                sendLogAsyncClient.appendEntries(appendEntriesRequest, appendEntriesHandler);
            }
        }

        private void appendEntriesSync(List<ByteBuffer> list, AppendEntriesRequest appendEntriesRequest, List<SendLogRequest> list2) {
            long operationStartTime = Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.getOperationStartTime();
            if (!LogDispatcher.this.member.waitForPrevLog(this.peer, list2.get(0).getLog())) {
                LogDispatcher.logger.warn("{}: node {} timed out when appending {}", new Object[]{LogDispatcher.this.member.getName(), this.receiver, list2.get(0).getLog()});
                return;
            }
            Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(operationStartTime);
            RaftService.Client syncClient = LogDispatcher.this.member.getSyncClient(this.receiver);
            if (syncClient == null) {
                LogDispatcher.logger.error("No available client for {}", this.receiver);
                return;
            }
            AppendEntriesHandler appendEntriesHandler = new AppendEntriesHandler(list2);
            long operationStartTime2 = Timer.Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
            try {
                try {
                    long appendEntries = syncClient.appendEntries(appendEntriesRequest);
                    Timer.Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(operationStartTime2);
                    if (appendEntries != -1 && LogDispatcher.logger.isInfoEnabled()) {
                        LogDispatcher.logger.info("{}: Append {} logs to {}, resp: {}", new Object[]{LogDispatcher.this.member.getName(), Integer.valueOf(list.size()), this.receiver, Long.valueOf(appendEntries)});
                    }
                    appendEntriesHandler.onComplete(Long.valueOf(appendEntries));
                    ClientUtils.putBackSyncClient(syncClient);
                } catch (TException e) {
                    syncClient.getInputProtocol().getTransport().close();
                    appendEntriesHandler.onError(e);
                    LogDispatcher.logger.warn("Failed logs: {}, first index: {}", list, Long.valueOf(appendEntriesRequest.prevLogIndex + 1));
                    ClientUtils.putBackSyncClient(syncClient);
                }
            } catch (Throwable th) {
                ClientUtils.putBackSyncClient(syncClient);
                throw th;
            }
        }

        private AppendEntriesRequest prepareRequest(List<ByteBuffer> list, List<SendLogRequest> list2, int i) {
            AppendEntriesRequest appendEntriesRequest = new AppendEntriesRequest();
            if (LogDispatcher.this.member.getHeader() != null) {
                appendEntriesRequest.setHeader(LogDispatcher.this.member.getHeader());
            }
            appendEntriesRequest.setLeader(LogDispatcher.this.member.getThisNode());
            appendEntriesRequest.setLeaderCommit(LogDispatcher.this.member.getLogManager().getCommitLogIndex());
            synchronized (LogDispatcher.this.member.getTerm()) {
                appendEntriesRequest.setTerm(LogDispatcher.this.member.getTerm().get());
            }
            appendEntriesRequest.setEntries(list);
            appendEntriesRequest.setPrevLogIndex(list2.get(i).getLog().getCurrLogIndex() - 1);
            try {
                appendEntriesRequest.setPrevLogTerm(list2.get(i).getAppendEntryRequest().prevLogTerm);
            } catch (Exception e) {
                LogDispatcher.logger.error("getTerm failed for newly append entries", e);
            }
            return appendEntriesRequest;
        }

        private void sendLogs(List<SendLogRequest> list) throws TException {
            int i = 0;
            LogDispatcher.logger.debug("send logs from index {} to {}", Long.valueOf(list.get(0).getLog().getCurrLogIndex()), Long.valueOf(list.get(list.size() - 1).getLog().getCurrLogIndex()));
            while (i < list.size()) {
                long thriftMaxFrameSize = IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize();
                ArrayList arrayList = new ArrayList();
                int i2 = i;
                while (i < list.size()) {
                    long length = list.get(i).getAppendEntryRequest().entry.array().length;
                    if (thriftMaxFrameSize - length <= 4194304) {
                        break;
                    }
                    thriftMaxFrameSize -= length;
                    Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(list.get(i).getLog().getCreateTime());
                    arrayList.add(list.get(i).getAppendEntryRequest().entry);
                    i++;
                }
                AppendEntriesRequest prepareRequest = prepareRequest(arrayList, list, i2);
                if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
                    appendEntriesAsync(arrayList, prepareRequest, list.subList(i2, i));
                } else {
                    appendEntriesSync(arrayList, prepareRequest, list.subList(i2, i));
                }
                while (i2 < i) {
                    Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(list.get(i2).getLog().getCreateTime());
                    i2++;
                }
            }
        }

        private void sendBatchLogs(List<SendLogRequest> list) throws TException {
            if (list.size() <= 1) {
                sendLog(list.get(0));
            } else {
                if (LogDispatcher.this.useBatchInLogCatchUp) {
                    sendLogs(list);
                    return;
                }
                Iterator<SendLogRequest> it = list.iterator();
                while (it.hasNext()) {
                    sendLog(it.next());
                }
            }
        }

        private void sendLog(SendLogRequest sendLogRequest) {
            Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(sendLogRequest.getLog().getCreateTime());
            LogDispatcher.this.member.sendLogToFollower(sendLogRequest.getLog(), sendLogRequest.getVoteCounter(), this.receiver, sendLogRequest.getLeaderShipStale(), sendLogRequest.getNewLeaderTerm(), sendLogRequest.getAppendEntryRequest());
            Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(sendLogRequest.getLog().getCreateTime());
        }
    }

    /* loaded from: input_file:org/apache/iotdb/cluster/log/LogDispatcher$SendLogRequest.class */
    public static class SendLogRequest {
        private Log log;
        private AtomicInteger voteCounter;
        private AtomicBoolean leaderShipStale;
        private AtomicLong newLeaderTerm;
        private AppendEntryRequest appendEntryRequest;
        private long enqueueTime;
        private Future<ByteBuffer> serializedLogFuture;

        public SendLogRequest(Log log, AtomicInteger atomicInteger, AtomicBoolean atomicBoolean, AtomicLong atomicLong, AppendEntryRequest appendEntryRequest) {
            setLog(log);
            setVoteCounter(atomicInteger);
            setLeaderShipStale(atomicBoolean);
            setNewLeaderTerm(atomicLong);
            setAppendEntryRequest(appendEntryRequest);
        }

        public AtomicInteger getVoteCounter() {
            return this.voteCounter;
        }

        public void setVoteCounter(AtomicInteger atomicInteger) {
            this.voteCounter = atomicInteger;
        }

        public Log getLog() {
            return this.log;
        }

        public void setLog(Log log) {
            this.log = log;
        }

        public long getEnqueueTime() {
            return this.enqueueTime;
        }

        public void setEnqueueTime(long j) {
            this.enqueueTime = j;
        }

        public AtomicBoolean getLeaderShipStale() {
            return this.leaderShipStale;
        }

        public void setLeaderShipStale(AtomicBoolean atomicBoolean) {
            this.leaderShipStale = atomicBoolean;
        }

        public AtomicLong getNewLeaderTerm() {
            return this.newLeaderTerm;
        }

        void setNewLeaderTerm(AtomicLong atomicLong) {
            this.newLeaderTerm = atomicLong;
        }

        public AppendEntryRequest getAppendEntryRequest() {
            return this.appendEntryRequest;
        }

        public void setAppendEntryRequest(AppendEntryRequest appendEntryRequest) {
            this.appendEntryRequest = appendEntryRequest;
        }

        public String toString() {
            return "SendLogRequest{log=" + this.log + '}';
        }
    }

    public LogDispatcher(RaftMember raftMember) {
        this.member = raftMember;
        this.executorService = IoTDBThreadPoolFactory.newCachedThreadPool("LogDispatcher-" + raftMember.getName());
        for (Node node : raftMember.getAllNodes()) {
            if (!node.equals(raftMember.getThisNode())) {
                this.nodeLogQueues.add(createQueueAndBindingThread(node));
            }
        }
    }

    public void close() throws InterruptedException {
        this.executorService.shutdownNow();
        this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
    }

    public void offer(SendLogRequest sendLogRequest) {
        if (!this.nodeLogQueues.isEmpty()) {
            sendLogRequest.serializedLogFuture = serializationService.submit(() -> {
                ByteBuffer serialize = sendLogRequest.getLog().serialize();
                sendLogRequest.getLog().setByteSize(serialize.array().length);
                return serialize;
            });
        }
        for (int i = 0; i < this.nodeLogQueues.size(); i++) {
            BlockingQueue<SendLogRequest> blockingQueue = this.nodeLogQueues.get(i);
            try {
                if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode() ? blockingQueue.offer(sendLogRequest, ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS(), TimeUnit.MILLISECONDS) : blockingQueue.add(sendLogRequest)) {
                    sendLogRequest.setEnqueueTime(System.nanoTime());
                } else {
                    logger.debug("Log queue[{}] of {} is full, ignore the log to this node", Integer.valueOf(i), this.member.getName());
                }
            } catch (IllegalStateException e) {
                logger.debug("Log queue[{}] of {} is full, ignore the log to this node", Integer.valueOf(i), this.member.getName());
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private BlockingQueue<SendLogRequest> createQueueAndBindingThread(Node node) {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
        for (int i = 0; i < 1; i++) {
            this.executorService.submit(new DispatcherThread(node, arrayBlockingQueue));
        }
        return arrayBlockingQueue;
    }
}
