/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.sql.engine.exec;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlDdl;
import org.apache.calcite.sql.SqlExplain;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.ValidationException;
import org.apache.ignite.internal.sql.engine.ResultSetMetadata;
import org.apache.ignite.internal.sql.engine.SqlCursor;
import org.apache.ignite.internal.sql.engine.exec.ClosableIteratorsHolder;
import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.ExecutionService;
import org.apache.ignite.internal.sql.engine.exec.LogicalRelImplementor;
import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode;
import org.apache.ignite.internal.sql.engine.exec.rel.Node;
import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
import org.apache.ignite.internal.sql.engine.exec.rel.RootNode;
import org.apache.ignite.internal.sql.engine.extension.SqlExtension;
import org.apache.ignite.internal.sql.engine.externalize.RelJsonReader;
import org.apache.ignite.internal.sql.engine.message.ErrorMessage;
import org.apache.ignite.internal.sql.engine.message.MessageService;
import org.apache.ignite.internal.sql.engine.message.QueryStartRequest;
import org.apache.ignite.internal.sql.engine.message.QueryStartResponse;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
import org.apache.ignite.internal.sql.engine.metadata.AffinityService;
import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
import org.apache.ignite.internal.sql.engine.metadata.FragmentMapping;
import org.apache.ignite.internal.sql.engine.metadata.MappingService;
import org.apache.ignite.internal.sql.engine.metadata.MappingServiceImpl;
import org.apache.ignite.internal.sql.engine.metadata.RemoteException;
import org.apache.ignite.internal.sql.engine.prepare.CacheKey;
import org.apache.ignite.internal.sql.engine.prepare.DdlPlan;
import org.apache.ignite.internal.sql.engine.prepare.ExplainPlan;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
import org.apache.ignite.internal.sql.engine.prepare.FragmentPlan;
import org.apache.ignite.internal.sql.engine.prepare.IgnitePlanner;
import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepDmlPlan;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepQueryPlan;
import org.apache.ignite.internal.sql.engine.prepare.PlannerHelper;
import org.apache.ignite.internal.sql.engine.prepare.PlanningContext;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlanCache;
import org.apache.ignite.internal.sql.engine.prepare.QueryTemplate;
import org.apache.ignite.internal.sql.engine.prepare.ResultSetMetadataImpl;
import org.apache.ignite.internal.sql.engine.prepare.ResultSetMetadataInternal;
import org.apache.ignite.internal.sql.engine.prepare.Splitter;
import org.apache.ignite.internal.sql.engine.prepare.ValidationResult;
import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.NodeLeaveHandler;
import org.apache.ignite.internal.sql.engine.util.TransformingIterator;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.util.Cancellable;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.TopologyEventHandler;
import org.apache.ignite.network.TopologyService;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class ExecutionServiceImpl<RowT>
implements ExecutionService {
    private static final IgniteLogger LOG = IgniteLogger.forClass(ExecutionServiceImpl.class);
    private static final SqlQueryMessagesFactory FACTORY = new SqlQueryMessagesFactory();
    private final TopologyService topSrvc;
    private final MessageService msgSrvc;
    private final String locNodeId;
    private final QueryPlanCache qryPlanCache;
    private final SqlSchemaManager sqlSchemaManager;
    private final QueryTaskExecutor taskExecutor;
    private final AffinityService affSrvc;
    private final MailboxRegistry mailboxRegistry;
    private final MappingService mappingSrvc;
    private final ExchangeService exchangeSrvc;
    private final ClosableIteratorsHolder iteratorsHolder;
    private final Map<UUID, QueryInfo> running;
    private final RowHandler<RowT> handler;
    private final DdlSqlToCommandConverter ddlConverter;
    private final Map<String, SqlExtension> extensions;
    private final TableManager tableManager;
    private final DdlCommandHandler ddlCmdHnd;

    public ExecutionServiceImpl(TopologyService topSrvc, MessageService msgSrvc, QueryPlanCache planCache, SqlSchemaManager sqlSchemaManager, TableManager tblManager, QueryTaskExecutor taskExecutor, RowHandler<RowT> handler, Map<String, SqlExtension> extensions) {
        this.topSrvc = topSrvc;
        this.handler = handler;
        this.msgSrvc = msgSrvc;
        this.sqlSchemaManager = sqlSchemaManager;
        this.taskExecutor = taskExecutor;
        this.extensions = extensions;
        this.tableManager = tblManager;
        this.ddlCmdHnd = new DdlCommandHandler(this.tableManager);
        this.locNodeId = topSrvc.localMember().id();
        this.qryPlanCache = planCache;
        this.running = new ConcurrentHashMap<UUID, QueryInfo>();
        this.ddlConverter = new DdlSqlToCommandConverter();
        this.iteratorsHolder = new ClosableIteratorsHolder(topSrvc.localMember().name(), LOG);
        this.mailboxRegistry = new MailboxRegistryImpl(topSrvc);
        this.exchangeSrvc = new ExchangeServiceImpl(this.locNodeId, taskExecutor, this.mailboxRegistry, msgSrvc);
        this.mappingSrvc = new MappingServiceImpl(topSrvc);
        this.affSrvc = cacheId -> Objects::hashCode;
    }

    @Override
    public void start() {
        this.iteratorsHolder.start();
        this.mailboxRegistry.start();
        this.exchangeSrvc.start();
        this.topSrvc.addEventHandler((TopologyEventHandler)new NodeLeaveHandler(this::onNodeLeft));
        this.msgSrvc.register((n, m) -> this.onMessage(n, (QueryStartRequest)m), (short)0);
        this.msgSrvc.register((n, m) -> this.onMessage(n, (QueryStartResponse)m), (short)1);
        this.msgSrvc.register((n, m) -> this.onMessage(n, (ErrorMessage)m), (short)2);
    }

    @Override
    public List<SqlCursor<List<?>>> executeQuery(String schema, String qry, Object[] params) {
        QueryPlan plan = this.qryPlanCache.queryPlan(new CacheKey(this.sqlSchemaManager.schema(schema).getName(), qry));
        if (plan != null) {
            PlanningContext pctx = this.createContext(Contexts.empty(), schema, qry, params);
            return Collections.singletonList(this.executePlan(UUID.randomUUID(), pctx, plan));
        }
        SqlNodeList qryList = Commons.parse(qry, Commons.FRAMEWORK_CONFIG.getParserConfig());
        ArrayList cursors = new ArrayList(qryList.size());
        for (SqlNode qry0 : qryList) {
            PlanningContext pctx = this.createContext(Contexts.empty(), schema, qry0.toString(), params);
            plan = qryList.size() == 1 ? this.qryPlanCache.queryPlan(new CacheKey(pctx.schemaName(), pctx.query()), () -> this.prepareSingle(qry0, pctx)) : this.prepareSingle(qry0, pctx);
            cursors.add(this.executePlan(UUID.randomUUID(), pctx, plan));
        }
        return cursors;
    }

    private SqlCursor<List<?>> mapAndExecutePlan(UUID qryId, MultiStepPlan plan, BaseQueryContext qctx, Object[] params) {
        plan.init(this.mappingSrvc, new MappingQueryContext(qctx, this.locNodeId, this.topologyVersion()));
        List<Fragment> fragments = plan.fragments();
        Fragment fragment = (Fragment)CollectionUtils.first(fragments);
        if (IgniteUtils.assertionsEnabled()) {
            assert (fragment != null);
            FragmentMapping mapping = plan.mapping(fragment);
            assert (mapping != null);
            List<String> nodes = mapping.nodeIds();
            assert (nodes != null && nodes.size() == 1 && ((String)CollectionUtils.first(nodes)).equals(this.locNodeId));
        }
        FragmentDescription fragmentDesc = new FragmentDescription(fragment.fragmentId(), plan.mapping(fragment), plan.target(fragment), plan.remotes(fragment));
        ExecutionContext ectx = new ExecutionContext(qctx, this.taskExecutor, qryId, this.locNodeId, this.locNodeId, this.topologyVersion(), fragmentDesc, this.handler, Commons.parametersMap(params));
        Object node = new LogicalRelImplementor<RowT>(ectx, this.affSrvc, this.mailboxRegistry, this.exchangeSrvc).go(fragment.root());
        QueryInfo info = new QueryInfo(ectx, plan, node);
        this.register(info);
        for (int i = 1; i < fragments.size(); ++i) {
            fragment = fragments.get(i);
            fragmentDesc = new FragmentDescription(fragment.fragmentId(), plan.mapping(fragment), plan.target(fragment), plan.remotes(fragment));
            Throwable ex = null;
            for (String nodeId : fragmentDesc.nodeIds()) {
                if (ex != null) {
                    info.onResponse(nodeId, fragment.fragmentId(), ex);
                    continue;
                }
                try {
                    QueryStartRequest req = FACTORY.queryStartRequest().queryId(qryId).fragmentId(fragment.fragmentId()).schema(qctx.schemaName()).root(fragment.serialized()).topologyVersion(ectx.topologyVersion()).fragmentDescription(fragmentDesc).parameters(params).build();
                    this.msgSrvc.send(nodeId, req);
                }
                catch (Throwable e) {
                    ex = e;
                    info.onResponse(nodeId, fragment.fragmentId(), ex);
                }
            }
        }
        return Commons.createCursor(new TransformingIterator<Object, List>(info.iterator(), row -> {
            int rowSize = ectx.rowHandler().columnCount(row);
            ArrayList<Object> res = new ArrayList<Object>(rowSize);
            for (int i = 0; i < rowSize; ++i) {
                res.add(ectx.rowHandler().get(i, row));
            }
            return res;
        }), (QueryPlan)plan);
    }

    @NotNull
    public List<SqlCursor<List<?>>> executePlans(Collection<QueryPlan> qryPlans, PlanningContext pctx) {
        ArrayList cursors = new ArrayList(qryPlans.size());
        for (QueryPlan plan : qryPlans) {
            UUID qryId = UUID.randomUUID();
            SqlCursor<List<?>> cur = this.executePlan(qryId, pctx, plan);
            cursors.add(cur);
        }
        return cursors;
    }

    @Override
    public void cancelQuery(UUID qryId) {
        QueryInfo info = this.running.get(qryId);
        if (info != null) {
            info.cancel();
        }
    }

    protected long topologyVersion() {
        return 1L;
    }

    private BaseQueryContext createQueryContext(Context parent, @Nullable String schema) {
        return BaseQueryContext.builder().parentContext(parent).frameworkConfig(Frameworks.newConfigBuilder((FrameworkConfig)Commons.FRAMEWORK_CONFIG).defaultSchema(this.sqlSchemaManager.schema(schema)).build()).logger(LOG).extensions(this.extensions).build();
    }

    private PlanningContext createContext(Context parent, @Nullable String schema, String qry, Object[] params) {
        return PlanningContext.builder().parentContext(this.createQueryContext(parent, schema)).query(qry).parameters(params).build();
    }

    private QueryPlan prepareQuery(SqlNode sqlNode, PlanningContext ctx) {
        IgnitePlanner planner = ctx.planner();
        ValidationResult validated = planner.validateAndGetTypeMetadata(sqlNode);
        sqlNode = validated.sqlNode();
        IgniteRel igniteRel = PlannerHelper.optimize(sqlNode, planner);
        List<Fragment> fragments = new Splitter().go(igniteRel);
        QueryTemplate template = new QueryTemplate(fragments);
        return new MultiStepQueryPlan(template, this.resultSetMetadata(ctx, validated.dataType(), validated.origins()));
    }

    private QueryPlan prepareFragment(String jsonFragment) {
        return new FragmentPlan((IgniteRel)RelJsonReader.fromJson(this.sqlSchemaManager, jsonFragment));
    }

    private QueryPlan prepareSingle(SqlNode sqlNode, PlanningContext ctx) {
        try {
            assert (this.single(sqlNode));
            ctx.planner().reset();
            if (SqlKind.DDL.contains(sqlNode.getKind())) {
                return this.prepareDdl(sqlNode, ctx);
            }
            switch (sqlNode.getKind()) {
                case SELECT: 
                case ORDER_BY: 
                case WITH: 
                case VALUES: 
                case UNION: 
                case EXCEPT: 
                case INTERSECT: {
                    return this.prepareQuery(sqlNode, ctx);
                }
                case INSERT: 
                case DELETE: 
                case UPDATE: {
                    return this.prepareDml(sqlNode, ctx);
                }
                case EXPLAIN: {
                    return this.prepareExplain(sqlNode, ctx);
                }
            }
            throw new IgniteInternalException("Unsupported operation [sqlNodeKind=" + sqlNode.getKind() + "; querySql=\"" + ctx.query() + "\"]");
        }
        catch (ValidationException e) {
            throw new IgniteInternalException("Failed to validate query", (Throwable)e);
        }
    }

    private QueryPlan prepareDml(SqlNode sqlNode, PlanningContext ctx) throws ValidationException {
        IgnitePlanner planner = ctx.planner();
        sqlNode = planner.validate(sqlNode);
        IgniteRel igniteRel = PlannerHelper.optimize(sqlNode, planner);
        List<Fragment> fragments = new Splitter().go(igniteRel);
        QueryTemplate template = new QueryTemplate(fragments);
        return new MultiStepDmlPlan(template, this.resultSetMetadata(ctx, igniteRel.getRowType(), null));
    }

    private QueryPlan prepareDdl(SqlNode sqlNode, PlanningContext ctx) {
        assert (sqlNode instanceof SqlDdl) : sqlNode == null ? "null" : sqlNode.getClass().getName();
        SqlDdl ddlNode = (SqlDdl)sqlNode;
        return new DdlPlan(this.ddlConverter.convert(ddlNode, ctx));
    }

    private QueryPlan prepareExplain(SqlNode explain, PlanningContext ctx) throws ValidationException {
        IgnitePlanner planner = ctx.planner();
        SqlNode sql = ((SqlExplain)explain).getExplicandum();
        sql = planner.validate(sql);
        IgniteRel igniteRel = PlannerHelper.optimize(sql, planner);
        String plan = RelOptUtil.toString((RelNode)igniteRel, (SqlExplainLevel)SqlExplainLevel.ALL_ATTRIBUTES);
        return new ExplainPlan(plan, this.explainFieldsMetadata(ctx));
    }

    private ResultSetMetadata explainFieldsMetadata(PlanningContext ctx) {
        IgniteTypeFactory factory = ctx.typeFactory();
        RelDataType planStrDataType = factory.createSqlType(SqlTypeName.VARCHAR, -1);
        IgniteBiTuple planField = new IgniteBiTuple((Object)"PLAN", (Object)planStrDataType);
        RelDataType planDataType = factory.createStructType(Collections.singletonList(planField));
        return this.resultSetMetadata(ctx, planDataType, null);
    }

    private SqlCursor<List<?>> executePlan(UUID qryId, PlanningContext pctx, QueryPlan plan) {
        switch (plan.type()) {
            case DML: 
            case QUERY: {
                return this.mapAndExecutePlan(qryId, (MultiStepPlan)plan, pctx.unwrap(BaseQueryContext.class), pctx.parameters());
            }
            case EXPLAIN: {
                return this.executeExplain((ExplainPlan)plan);
            }
            case DDL: {
                return this.executeDdl((DdlPlan)plan, pctx);
            }
        }
        throw new AssertionError((Object)("Unexpected plan type: " + plan));
    }

    private SqlCursor<List<?>> executeDdl(DdlPlan plan, PlanningContext pctx) {
        try {
            this.ddlCmdHnd.handle(plan.command(), pctx);
        }
        catch (IgniteInternalCheckedException e) {
            throw new IgniteInternalException("Failed to execute DDL statement [stmt=" + pctx.query() + ", err=" + e.getMessage() + "]", (Throwable)e);
        }
        return Commons.createCursor(Collections.emptyIterator(), (QueryPlan)plan);
    }

    private SqlCursor<List<?>> executeExplain(ExplainPlan plan) {
        SqlCursor<List<?>> cur = Commons.createCursor(Collections.singletonList(Collections.singletonList(plan.plan())), (QueryPlan)plan);
        return cur;
    }

    private void executeFragment(UUID qryId, FragmentPlan plan, ExecutionContext<RowT> ectx) {
        String origNodeId = ectx.originatingNodeId();
        Outbox node = (Outbox)new LogicalRelImplementor<RowT>(ectx, this.affSrvc, this.mailboxRegistry, this.exchangeSrvc).go(plan.root());
        try {
            this.msgSrvc.send(origNodeId, FACTORY.queryStartResponse().queryId(qryId).fragmentId(ectx.fragmentId()).build());
        }
        catch (IgniteInternalCheckedException e) {
            IgniteInternalException wrpEx = new IgniteInternalException("Failed to send reply. [nodeId=" + origNodeId + "]", (Throwable)e);
            throw wrpEx;
        }
        node.init();
    }

    private void register(QueryInfo info) {
        UUID qryId = info.ctx.queryId();
        this.running.put(qryId, info);
    }

    private ResultSetMetadataInternal resultSetMetadata(PlanningContext ctx, RelDataType sqlType, @Nullable List<List<String>> origins) {
        return new ResultSetMetadataImpl(TypeUtils.getResultType(ctx.typeFactory(), (RelOptSchema)ctx.catalogReader(), sqlType, origins), origins);
    }

    private boolean single(SqlNode sqlNode) {
        return !(sqlNode instanceof SqlNodeList);
    }

    private void onMessage(String nodeId, QueryStartRequest msg) {
        assert (nodeId != null && msg != null);
        try {
            QueryPlan qryPlan = this.qryPlanCache.queryPlan(new CacheKey(msg.schema(), msg.root()), () -> this.prepareFragment(msg.root()));
            FragmentPlan plan = (FragmentPlan)qryPlan;
            BaseQueryContext qctx = this.createQueryContext(Contexts.empty(), msg.schema());
            ExecutionContext<RowT> ectx = new ExecutionContext<RowT>(qctx, this.taskExecutor, msg.queryId(), this.locNodeId, nodeId, msg.topologyVersion(), msg.fragmentDescription(), this.handler, Commons.parametersMap(msg.parameters()));
            this.executeFragment(msg.queryId(), plan, ectx);
        }
        catch (Throwable ex) {
            LOG.error("Failed to start query fragment", ex);
            this.mailboxRegistry.outboxes(msg.queryId(), msg.fragmentId(), -1L).forEach(AbstractNode::close);
            this.mailboxRegistry.inboxes(msg.queryId(), msg.fragmentId(), -1L).forEach(AbstractNode::close);
            try {
                this.msgSrvc.send(nodeId, FACTORY.queryStartResponse().queryId(msg.queryId()).fragmentId(msg.fragmentId()).error(ex).build());
            }
            catch (Exception e) {
                LOG.error("Error occurred during send error message", (Throwable)e);
                IgniteInternalException wrpEx = new IgniteInternalException("Error occurred during send error message", (Throwable)e);
                e.addSuppressed(ex);
                throw wrpEx;
            }
            throw ex;
        }
    }

    private void onMessage(String nodeId, QueryStartResponse msg) {
        assert (nodeId != null && msg != null);
        QueryInfo info = this.running.get(msg.queryId());
        if (info != null) {
            info.onResponse(nodeId, msg.fragmentId(), msg.error());
        }
    }

    private void onMessage(String nodeId, ErrorMessage msg) {
        assert (nodeId != null && msg != null);
        QueryInfo info = this.running.get(msg.queryId());
        if (info != null) {
            info.onError(new RemoteException(nodeId, msg.queryId(), msg.fragmentId(), msg.error()));
        }
    }

    private void onNodeLeft(ClusterNode node) {
        this.running.forEach((uuid, queryInfo) -> queryInfo.onNodeLeft(node.id()));
    }

    @Override
    public void stop() throws Exception {
        AutoCloseable[] autoCloseableArray = new AutoCloseable[4];
        autoCloseableArray[0] = this.qryPlanCache::stop;
        autoCloseableArray[1] = this.iteratorsHolder::stop;
        autoCloseableArray[2] = this.mailboxRegistry::stop;
        autoCloseableArray[3] = this.exchangeSrvc::stop;
        IgniteUtils.closeAll((AutoCloseable[])autoCloseableArray);
    }

    private final class QueryInfo
    implements Cancellable {
        private final ExecutionContext<RowT> ctx;
        private final RootNode<RowT> root;
        private final Set<String> remotes;
        private final Set<RemoteFragmentKey> waiting;
        private volatile QueryState state;

        private QueryInfo(ExecutionContext<RowT> ctx, MultiStepPlan plan, Node<RowT> root) {
            this.ctx = ctx;
            RootNode rootNode = new RootNode(ctx, plan.metadata().rowType(), this::tryClose);
            rootNode.register(root);
            this.root = rootNode;
            this.remotes = new HashSet<String>();
            this.waiting = new HashSet<RemoteFragmentKey>();
            for (int i = 1; i < plan.fragments().size(); ++i) {
                Fragment fragment = plan.fragments().get(i);
                List<String> nodes = plan.mapping(fragment).nodeIds();
                this.remotes.addAll(nodes);
                for (String node : nodes) {
                    this.waiting.add(new RemoteFragmentKey(node, fragment.fragmentId()));
                }
            }
            this.state = QueryState.RUNNING;
        }

        public Iterator<RowT> iterator() {
            return ExecutionServiceImpl.this.iteratorsHolder.iterator(this.root);
        }

        public void cancel() {
            this.root.close();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void tryClose() {
            QueryState state0 = null;
            QueryInfo queryInfo = this;
            synchronized (queryInfo) {
                if (this.state == QueryState.CLOSED) {
                    return;
                }
                if (this.state == QueryState.RUNNING) {
                    state0 = this.state = QueryState.CLOSING;
                }
                this.root.closeInternal();
                if (this.state == QueryState.CLOSING && this.waiting.isEmpty()) {
                    state0 = this.state = QueryState.CLOSED;
                }
            }
            if (state0 == QueryState.CLOSED) {
                ExecutionServiceImpl.this.running.remove(this.ctx.queryId());
                IgniteInternalException wrpEx = null;
                for (String nodeId : this.remotes) {
                    try {
                        ExecutionServiceImpl.this.exchangeSrvc.closeOutbox(nodeId, this.ctx.queryId(), -1L, -1L);
                    }
                    catch (IgniteInternalCheckedException e) {
                        if (wrpEx == null) {
                            wrpEx = new IgniteInternalException("Failed to send cancel message. [nodeId=" + nodeId + "]", (Throwable)e);
                            continue;
                        }
                        wrpEx.addSuppressed((Throwable)e);
                    }
                }
                this.root.context().execute(this.ctx::cancel, this.root::onError);
                if (wrpEx != null) {
                    throw wrpEx;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void onNodeLeft(String nodeId) {
            ArrayList<RemoteFragmentKey> fragments = null;
            QueryInfo queryInfo = this;
            synchronized (queryInfo) {
                for (RemoteFragmentKey fragment : this.waiting) {
                    if (!fragment.nodeId.equals(nodeId)) continue;
                    if (fragments == null) {
                        fragments = new ArrayList<RemoteFragmentKey>();
                    }
                    fragments.add(fragment);
                }
            }
            if (!CollectionUtils.nullOrEmpty(fragments)) {
                IgniteInternalCheckedException ex = new IgniteInternalCheckedException("Failed to start query, node left. nodeId=" + nodeId);
                for (RemoteFragmentKey fragment : fragments) {
                    this.onResponse(fragment, ex);
                }
            }
        }

        private void onResponse(String nodeId, long fragmentId, Throwable error) {
            this.onResponse(new RemoteFragmentKey(nodeId, fragmentId), error);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void onResponse(RemoteFragmentKey fragment, Throwable error) {
            QueryState state;
            QueryInfo queryInfo = this;
            synchronized (queryInfo) {
                this.waiting.remove(fragment);
                state = this.state;
            }
            if (error != null) {
                this.onError(error);
            } else if (state == QueryState.CLOSING) {
                this.tryClose();
            }
        }

        private void onError(Throwable error) {
            this.root.onError(error);
            this.tryClose();
        }
    }

    private static final class RemoteFragmentKey {
        private final String nodeId;
        private final long fragmentId;

        private RemoteFragmentKey(String nodeId, long fragmentId) {
            this.nodeId = nodeId;
            this.fragmentId = fragmentId;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            RemoteFragmentKey that = (RemoteFragmentKey)o;
            if (this.fragmentId != that.fragmentId) {
                return false;
            }
            return this.nodeId.equals(that.nodeId);
        }

        public int hashCode() {
            int res = this.nodeId.hashCode();
            res = 31 * res + (int)(this.fragmentId ^ this.fragmentId >>> 32);
            return res;
        }
    }

    private static enum QueryState {
        RUNNING,
        CLOSING,
        CLOSED;

    }
}

