/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.sql.engine.message;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.message.ExecutionContextAwareMessage;
import org.apache.ignite.internal.sql.engine.message.MessageListener;
import org.apache.ignite.internal.sql.engine.message.MessageService;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.TopologyService;

public class MessageServiceImpl
implements MessageService {
    private static final UUID QUERY_ID_STUB = UUID.randomUUID();
    private static final IgniteLogger LOG = IgniteLogger.forClass(MessageServiceImpl.class);
    private final TopologyService topSrvc;
    private final MessagingService messagingSrvc;
    private final String locNodeId;
    private final QueryTaskExecutor taskExecutor;
    private volatile Map<Short, MessageListener> lsnrs;

    public MessageServiceImpl(TopologyService topSrvc, MessagingService messagingSrvc, QueryTaskExecutor taskExecutor) {
        this.topSrvc = topSrvc;
        this.messagingSrvc = messagingSrvc;
        this.taskExecutor = taskExecutor;
        this.locNodeId = topSrvc.localMember().id();
    }

    @Override
    public void start() {
        this.messagingSrvc.addMessageHandler(SqlQueryMessageGroup.class, this::onMessage);
    }

    @Override
    public void send(String nodeId, NetworkMessage msg) throws IgniteInternalCheckedException {
        if (this.locNodeId.equals(nodeId)) {
            this.onMessage(nodeId, msg);
        } else {
            ClusterNode node = this.topSrvc.allMembers().stream().filter(cn -> nodeId.equals(cn.id())).findFirst().orElseThrow(() -> new IgniteInternalException("Failed to send message to node (has node left grid?): " + nodeId));
            try {
                this.messagingSrvc.send(node, msg).get();
            }
            catch (Exception ex) {
                if (ex instanceof IgniteInternalCheckedException) {
                    throw (IgniteInternalCheckedException)((Object)ex);
                }
                throw new IgniteInternalCheckedException((Throwable)ex);
            }
        }
    }

    @Override
    public void register(MessageListener lsnr, short type) {
        if (this.lsnrs == null) {
            this.lsnrs = new HashMap<Short, MessageListener>();
        }
        MessageListener old = this.lsnrs.put(type, lsnr);
        assert (old == null) : old;
    }

    @Override
    public boolean alive(String nodeId) {
        return this.topSrvc.allMembers().stream().map(ClusterNode::id).anyMatch(id -> id.equals(nodeId));
    }

    protected void onMessage(String nodeId, NetworkMessage msg) {
        if (msg instanceof ExecutionContextAwareMessage) {
            ExecutionContextAwareMessage msg0 = (ExecutionContextAwareMessage)msg;
            this.taskExecutor.execute(msg0.queryId(), msg0.fragmentId(), () -> this.onMessageInternal(nodeId, msg));
        } else {
            this.taskExecutor.execute(QUERY_ID_STUB, ThreadLocalRandom.current().nextLong(1024L), () -> this.onMessageInternal(nodeId, msg));
        }
    }

    private void onMessage(NetworkMessage msg, NetworkAddress addr, String correlationId) {
        assert (msg.groupType() == 4) : "unexpected message group grpType=" + msg.groupType();
        ClusterNode node = this.topSrvc.getByAddress(addr);
        if (node == null) {
            LOG.warn("Received a message from a node that has not yet joined the cluster: addr={}, msg={}", new Object[]{addr, msg});
            return;
        }
        this.onMessage(node.id(), msg);
    }

    private void onMessageInternal(String nodeId, NetworkMessage msg) {
        MessageListener lsnr = Objects.requireNonNull(this.lsnrs.get(msg.messageType()), "there is no listener for msgType=" + msg.messageType());
        lsnr.onMessage(nodeId, msg);
    }

    @Override
    public void stop() {
        if (this.lsnrs != null) {
            this.lsnrs.clear();
        }
    }
}

