package org.apache.ignite.raft.server.impl;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.RaftErrorCode;
import org.apache.ignite.raft.client.ReadCommand;
import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.exception.RaftException;
import org.apache.ignite.raft.client.message.ActionRequest;
import org.apache.ignite.raft.client.message.GetLeaderRequest;
import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupCommandListener;
import org.apache.ignite.raft.server.RaftServer;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:org/apache/ignite/raft/server/impl/RaftServerImpl.class */
public class RaftServerImpl implements RaftServer {
    private static final IgniteLogger LOG = IgniteLogger.forClass(RaftServerImpl.class);
    private final String id;
    private final RaftClientMessageFactory clientMsgFactory;
    private final ClusterService server;
    private final ConcurrentMap<String, RaftGroupCommandListener> listeners = new ConcurrentHashMap();
    private final BlockingQueue<CommandClosureEx<ReadCommand>> readQueue;
    private final BlockingQueue<CommandClosureEx<WriteCommand>> writeQueue;
    private final Thread readWorker;
    private final Thread writeWorker;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/raft/server/impl/RaftServerImpl$CommandClosureEx.class */
    public interface CommandClosureEx<T extends Command> extends CommandClosure<T> {
        RaftGroupCommandListener listener();
    }

    public RaftServerImpl(ClusterService clusterService, @NotNull RaftClientMessageFactory raftClientMessageFactory, int i, Map<String, RaftGroupCommandListener> map) {
        Objects.requireNonNull(raftClientMessageFactory);
        this.id = clusterService.topologyService().localMember().name();
        this.clientMsgFactory = raftClientMessageFactory;
        if (map != null) {
            this.listeners.putAll(map);
        }
        this.readQueue = new ArrayBlockingQueue(i);
        this.writeQueue = new ArrayBlockingQueue(i);
        this.server = clusterService;
        this.server.messagingService().addMessageHandler((networkMessage, clusterNode, str) -> {
            if (networkMessage instanceof GetLeaderRequest) {
                this.server.messagingService().send(clusterNode, raftClientMessageFactory.getLeaderResponse().leader(new Peer(this.server.topologyService().localMember())).build(), str);
                return;
            }
            if (networkMessage instanceof ActionRequest) {
                ActionRequest<?> actionRequest = (ActionRequest) networkMessage;
                RaftGroupCommandListener raftGroupCommandListener = this.listeners.get(actionRequest.groupId());
                if (raftGroupCommandListener == null) {
                    sendError(clusterNode, str, RaftErrorCode.ILLEGAL_STATE);
                } else if (actionRequest.command() instanceof ReadCommand) {
                    handleActionRequest(clusterNode, actionRequest, str, this.readQueue, raftGroupCommandListener);
                } else {
                    handleActionRequest(clusterNode, actionRequest, str, this.writeQueue, raftGroupCommandListener);
                }
            }
        });
        this.readWorker = new Thread(() -> {
            processQueue(this.readQueue, (v0, v1) -> {
                v0.onRead(v1);
            });
        }, "read-cmd-worker#" + this.id);
        this.readWorker.setDaemon(true);
        this.readWorker.start();
        this.writeWorker = new Thread(() -> {
            processQueue(this.writeQueue, (v0, v1) -> {
                v0.onWrite(v1);
            });
        }, "write-cmd-worker#" + this.id);
        this.writeWorker.setDaemon(true);
        this.writeWorker.start();
        LOG.info("Started replication server [node=" + this.server.topologyService().localMember() + "]", new Object[0]);
    }

    @Override // org.apache.ignite.raft.server.RaftServer
    public ClusterNode localMember() {
        return this.server.topologyService().localMember();
    }

    @Override // org.apache.ignite.raft.server.RaftServer
    public void setListener(String str, RaftGroupCommandListener raftGroupCommandListener) {
        this.listeners.put(str, raftGroupCommandListener);
    }

    @Override // org.apache.ignite.raft.server.RaftServer
    public void clearListener(String str) {
        this.listeners.remove(str);
    }

    @Override // org.apache.ignite.raft.server.RaftServer
    public synchronized void shutdown() throws Exception {
        this.server.shutdown();
        this.readWorker.interrupt();
        this.readWorker.join();
        this.writeWorker.interrupt();
        this.writeWorker.join();
        LOG.info("Stopped replication server [node=" + this.server.topologyService().localMember() + "]", new Object[0]);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T extends Command> void handleActionRequest(final ClusterNode clusterNode, final ActionRequest<?> actionRequest, final String str, BlockingQueue<CommandClosureEx<T>> blockingQueue, final RaftGroupCommandListener raftGroupCommandListener) {
        if (blockingQueue.offer(new CommandClosureEx<T>() { // from class: org.apache.ignite.raft.server.impl.RaftServerImpl.1
            @Override // org.apache.ignite.raft.server.impl.RaftServerImpl.CommandClosureEx
            public RaftGroupCommandListener listener() {
                return raftGroupCommandListener;
            }

            /* JADX WARN: Incorrect return type in method signature: ()TT; */
            public Command command() {
                return actionRequest.command();
            }

            public void success(Object obj) {
                RaftServerImpl.this.server.messagingService().send(clusterNode, RaftServerImpl.this.clientMsgFactory.actionResponse().result(obj).build(), str);
            }

            public void failure(Throwable th) {
                RaftServerImpl.this.sendError(clusterNode, str, RaftErrorCode.ILLEGAL_STATE);
            }
        })) {
            return;
        }
        sendError(clusterNode, str, RaftErrorCode.BUSY);
    }

    private <T extends Command> void processQueue(BlockingQueue<CommandClosureEx<T>> blockingQueue, BiConsumer<RaftGroupCommandListener, Iterator<CommandClosure<T>>> biConsumer) {
        while (!Thread.interrupted()) {
            try {
                CommandClosureEx<T> take = blockingQueue.take();
                RaftGroupCommandListener listener = take.listener();
                if (listener == null) {
                    take.failure(new RaftException(RaftErrorCode.ILLEGAL_STATE));
                } else {
                    biConsumer.accept(listener, List.of(take).iterator());
                }
            } catch (InterruptedException e) {
                return;
            }
        }
    }

    private void sendError(ClusterNode clusterNode, String str, RaftErrorCode raftErrorCode) {
        this.server.messagingService().send(clusterNode, this.clientMsgFactory.raftErrorResponse().errorCode(raftErrorCode).build(), str);
    }
}
