/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.sql.engine.exec;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
import org.apache.ignite.internal.sql.engine.message.InboxCloseMessage;
import org.apache.ignite.internal.sql.engine.message.MessageService;
import org.apache.ignite.internal.sql.engine.message.OutboxCloseMessage;
import org.apache.ignite.internal.sql.engine.message.QueryBatchAcknowledgeMessage;
import org.apache.ignite.internal.sql.engine.message.QueryBatchMessage;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;

public class ExchangeServiceImpl
implements ExchangeService {
    private static final IgniteLogger LOG = IgniteLogger.forClass(ExchangeServiceImpl.class);
    private static final SqlQueryMessagesFactory FACTORY = new SqlQueryMessagesFactory();
    private final String localNodeId;
    private final QueryTaskExecutor taskExecutor;
    private final MailboxRegistry mailboxRegistry;
    private final MessageService msgSrvc;

    public ExchangeServiceImpl(String localNodeId, QueryTaskExecutor taskExecutor, MailboxRegistry mailboxRegistry, MessageService msgSrvc) {
        this.localNodeId = localNodeId;
        this.taskExecutor = taskExecutor;
        this.mailboxRegistry = mailboxRegistry;
        this.msgSrvc = msgSrvc;
    }

    @Override
    public void start() {
        this.msgSrvc.register((n, m) -> this.onMessage(n, (InboxCloseMessage)m), (short)5);
        this.msgSrvc.register((n, m) -> this.onMessage(n, (OutboxCloseMessage)m), (short)6);
        this.msgSrvc.register((n, m) -> this.onMessage(n, (QueryBatchAcknowledgeMessage)m), (short)4);
        this.msgSrvc.register((n, m) -> this.onMessage(n, (QueryBatchMessage)m), (short)3);
    }

    @Override
    public <RowT> void sendBatch(String nodeId, UUID qryId, long fragmentId, long exchangeId, int batchId, boolean last, List<RowT> rows) throws IgniteInternalCheckedException {
        this.msgSrvc.send(nodeId, FACTORY.queryBatchMessage().queryId(qryId).fragmentId(fragmentId).exchangeId(exchangeId).batchId(batchId).last(last).rows(Commons.cast(rows)).build());
    }

    @Override
    public void acknowledge(String nodeId, UUID qryId, long fragmentId, long exchangeId, int batchId) throws IgniteInternalCheckedException {
        this.msgSrvc.send(nodeId, FACTORY.queryBatchAcknowledgeMessage().queryId(qryId).fragmentId(fragmentId).exchangeId(exchangeId).batchId(batchId).build());
    }

    @Override
    public void closeOutbox(String nodeId, UUID qryId, long fragmentId, long exchangeId) throws IgniteInternalCheckedException {
        this.msgSrvc.send(nodeId, FACTORY.outboxCloseMessage().queryId(qryId).fragmentId(fragmentId).exchangeId(exchangeId).build());
    }

    @Override
    public void closeInbox(String nodeId, UUID qryId, long fragmentId, long exchangeId) throws IgniteInternalCheckedException {
        this.msgSrvc.send(nodeId, FACTORY.inboxCloseMessage().queryId(qryId).fragmentId(fragmentId).exchangeId(exchangeId).build());
    }

    @Override
    public void sendError(String nodeId, UUID qryId, long fragmentId, Throwable err) throws IgniteInternalCheckedException {
        this.msgSrvc.send(nodeId, FACTORY.errorMessage().queryId(qryId).fragmentId(fragmentId).error(err).build());
    }

    @Override
    public boolean alive(String nodeId) {
        return this.msgSrvc.alive(nodeId);
    }

    protected void onMessage(String nodeId, InboxCloseMessage msg) {
        Collection<Inbox<?>> inboxes = this.mailboxRegistry.inboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
        if (!CollectionUtils.nullOrEmpty(inboxes)) {
            for (Inbox<?> inbox : inboxes) {
                inbox.context().execute(inbox::close, inbox::onError);
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("Stale inbox cancel message received: [nodeId=" + nodeId + ", queryId=" + msg.queryId() + ", fragmentId=" + msg.fragmentId() + ", exchangeId=" + msg.exchangeId() + "]", new Object[0]);
        }
    }

    protected void onMessage(String nodeId, OutboxCloseMessage msg) {
        Collection<Outbox<?>> outboxes = this.mailboxRegistry.outboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
        if (!CollectionUtils.nullOrEmpty(outboxes)) {
            for (Outbox<?> outbox : outboxes) {
                outbox.context().execute(outbox::close, outbox::onError);
            }
            for (Outbox<?> outbox : outboxes) {
                outbox.context().execute(outbox.context()::cancel, outbox::onError);
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("Stale outbox cancel message received: [nodeId=" + nodeId + ", queryId=" + msg.queryId() + ", fragmentId=" + msg.fragmentId() + ", exchangeId=" + msg.exchangeId() + "]", new Object[0]);
        }
    }

    protected void onMessage(String nodeId, QueryBatchAcknowledgeMessage msg) {
        Outbox<?> outbox = this.mailboxRegistry.outbox(msg.queryId(), msg.exchangeId());
        if (outbox != null) {
            try {
                outbox.onAcknowledge(nodeId, msg.batchId());
            }
            catch (Throwable e) {
                outbox.onError(e);
                throw new IgniteInternalException("Unexpected exception", e);
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("Stale acknowledge message received: [nodeId=" + nodeId + ", queryId=" + msg.queryId() + ", fragmentId=" + msg.fragmentId() + ", exchangeId=" + msg.exchangeId() + ", batchId=" + msg.batchId() + "]", new Object[0]);
        }
    }

    protected void onMessage(String nodeId, QueryBatchMessage msg) {
        Inbox<?> inbox = this.mailboxRegistry.inbox(msg.queryId(), msg.exchangeId());
        if (inbox == null && msg.batchId() == 0) {
            Inbox newInbox = new Inbox(this.baseInboxContext(nodeId, msg.queryId(), msg.fragmentId()), this, this.mailboxRegistry, msg.exchangeId(), msg.exchangeId());
            inbox = this.mailboxRegistry.register(newInbox);
        }
        if (inbox != null) {
            try {
                inbox.onBatchReceived(nodeId, msg.batchId(), msg.last(), Commons.cast(msg.rows()));
            }
            catch (Throwable e) {
                inbox.onError(e);
                throw new IgniteInternalException("Unexpected exception", e);
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("Stale batch message received: [nodeId=" + nodeId + ", queryId=" + msg.queryId() + ", fragmentId=" + msg.fragmentId() + ", exchangeId=" + msg.exchangeId() + ", batchId=" + msg.batchId() + "]", new Object[0]);
        }
    }

    private ExecutionContext<?> baseInboxContext(String nodeId, UUID qryId, long fragmentId) {
        return new ExecutionContext(BaseQueryContext.builder().logger(LOG).build(), this.taskExecutor, qryId, this.localNodeId, nodeId, -1L, new FragmentDescription(fragmentId, null, null, null), null, Map.of());
    }

    @Override
    public void stop() {
    }
}

