package org.apache.ignite.internal.raft.server.impl;

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.ReadCommand;
import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupListener;
import org.apache.ignite.raft.jraft.RaftMessageGroup;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.rpc.ActionRequest;
import org.apache.ignite.raft.jraft.rpc.CliRequests;
import org.apache.ignite.raft.jraft.rpc.impl.SMCompactedThrowable;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/raft/server/impl/RaftServerImpl.class */
public class RaftServerImpl implements RaftServer {
    private static final int QUEUE_SIZE = 1000;
    private static final IgniteLogger LOG;
    private final RaftMessagesFactory clientMsgFactory;
    private final ClusterService service;
    private final ConcurrentMap<String, RaftGroupListener> listeners = new ConcurrentHashMap();
    private final BlockingQueue<CommandClosureEx<ReadCommand>> readQueue;
    private final BlockingQueue<CommandClosureEx<WriteCommand>> writeQueue;
    private volatile Thread readWorker;
    private volatile Thread writeWorker;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/raft/server/impl/RaftServerImpl$CommandClosureEx.class */
    public interface CommandClosureEx<T extends Command> extends CommandClosure<T> {
        RaftGroupListener listener();
    }

    public RaftServerImpl(ClusterService clusterService, RaftMessagesFactory raftMessagesFactory) {
        Objects.requireNonNull(clusterService);
        Objects.requireNonNull(raftMessagesFactory);
        this.service = clusterService;
        this.clientMsgFactory = raftMessagesFactory;
        this.readQueue = new ArrayBlockingQueue(1000);
        this.writeQueue = new ArrayBlockingQueue(1000);
    }

    public void start() {
        this.service.messagingService().addMessageHandler(RaftMessageGroup.class, (networkMessage, networkAddress, str) -> {
            if (networkMessage instanceof CliRequests.GetLeaderRequest) {
                this.service.messagingService().send(networkAddress, this.clientMsgFactory.getLeaderResponse().leaderId(PeerId.fromPeer(new Peer(this.service.topologyService().localMember().address())).toString()).build(), str);
            } else if (networkMessage instanceof ActionRequest) {
                ActionRequest actionRequest = (ActionRequest) networkMessage;
                RaftGroupListener raftGroupListener = this.listeners.get(actionRequest.groupId());
                if (raftGroupListener == null) {
                    sendError(networkAddress, str, RaftError.UNKNOWN);
                } else if (actionRequest.command() instanceof ReadCommand) {
                    handleActionRequest(networkAddress, actionRequest, str, this.readQueue, raftGroupListener);
                } else {
                    handleActionRequest(networkAddress, actionRequest, str, this.writeQueue, raftGroupListener);
                }
            }
        });
        this.readWorker = new Thread(() -> {
            processQueue(this.readQueue, (v0, v1) -> {
                v0.onRead(v1);
            });
        }, "read-cmd-worker#" + this.service.topologyService().localMember().toString());
        this.readWorker.setDaemon(true);
        this.readWorker.start();
        this.writeWorker = new Thread(() -> {
            processQueue(this.writeQueue, (v0, v1) -> {
                v0.onWrite(v1);
            });
        }, "write-cmd-worker#" + this.service.topologyService().localMember().toString());
        this.writeWorker.setDaemon(true);
        this.writeWorker.start();
        LOG.info("Started replication server [node={}]", new Object[]{this.service});
    }

    public void stop() throws NodeStoppingException {
        if (!$assertionsDisabled && !this.listeners.isEmpty()) {
            throw new AssertionError(IgniteStringFormatter.format("Raft groups are still running {}", new Object[]{this.listeners.keySet()}));
        }
        if (this.readWorker != null) {
            this.readWorker.interrupt();
            try {
                this.readWorker.join();
            } catch (InterruptedException e) {
                throw new NodeStoppingException("Unable to stop read worker.", e);
            }
        }
        if (this.writeWorker != null) {
            this.writeWorker.interrupt();
            try {
                this.writeWorker.join();
            } catch (InterruptedException e2) {
                throw new NodeStoppingException("Unable to stop write worker.", e2);
            }
        }
        LOG.info("Stopped replication server [node={}]", new Object[]{this.service});
    }

    public ClusterService clusterService() {
        return this.service;
    }

    public synchronized boolean startRaftGroup(String str, RaftGroupListener raftGroupListener, List<Peer> list) {
        if (this.listeners.containsKey(str)) {
            return false;
        }
        this.listeners.put(str, raftGroupListener);
        return true;
    }

    public synchronized boolean stopRaftGroup(String str) {
        return this.listeners.remove(str) != null;
    }

    @Nullable
    public Peer localPeer(String str) {
        return new Peer(this.service.topologyService().localMember().address());
    }

    public Set<String> startedGroups() {
        return this.listeners.keySet();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T extends Command> void handleActionRequest(final NetworkAddress networkAddress, final ActionRequest actionRequest, final String str, BlockingQueue<CommandClosureEx<T>> blockingQueue, final RaftGroupListener raftGroupListener) {
        if (blockingQueue.offer(new CommandClosureEx<T>() { // from class: org.apache.ignite.internal.raft.server.impl.RaftServerImpl.1
            @Override // org.apache.ignite.internal.raft.server.impl.RaftServerImpl.CommandClosureEx
            public RaftGroupListener listener() {
                return raftGroupListener;
            }

            /* JADX WARN: Incorrect return type in method signature: ()TT; */
            public Command command() {
                return actionRequest.command();
            }

            public void result(Serializable serializable) {
                RaftServerImpl.this.service.messagingService().send(networkAddress, serializable instanceof Throwable ? RaftServerImpl.this.clientMsgFactory.sMErrorResponse().error(new SMCompactedThrowable((Throwable) serializable)).build() : RaftServerImpl.this.clientMsgFactory.actionResponse().result(serializable).build(), str);
            }
        })) {
            return;
        }
        sendError(networkAddress, str, RaftError.EBUSY);
    }

    private <T extends Command> void processQueue(BlockingQueue<CommandClosureEx<T>> blockingQueue, BiConsumer<RaftGroupListener, Iterator<CommandClosure<T>>> biConsumer) {
        while (!Thread.interrupted()) {
            try {
                CommandClosureEx<T> take = blockingQueue.take();
                biConsumer.accept(take.listener(), List.of(take).iterator());
            } catch (InterruptedException e) {
                return;
            } catch (Exception e2) {
                LOG.error("Failed to process the command", e2);
            }
        }
    }

    private void sendError(NetworkAddress networkAddress, String str, RaftError raftError) {
        this.service.messagingService().send(networkAddress, this.clientMsgFactory.errorResponse().errorCode(raftError.getNumber()).build(), str);
    }

    static {
        $assertionsDisabled = !RaftServerImpl.class.desiredAssertionStatus();
        LOG = IgniteLogger.forClass(RaftServerImpl.class);
    }
}
