package org.apache.ignite.internal.sql.engine.exec;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.internal.sql.engine.AsyncCursor;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode;
import org.apache.ignite.internal.sql.engine.exec.rel.AsyncRootNode;
import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
import org.apache.ignite.internal.sql.engine.externalize.RelJsonReader;
import org.apache.ignite.internal.sql.engine.message.ErrorMessage;
import org.apache.ignite.internal.sql.engine.message.MessageService;
import org.apache.ignite.internal.sql.engine.message.QueryCloseMessage;
import org.apache.ignite.internal.sql.engine.message.QueryStartRequest;
import org.apache.ignite.internal.sql.engine.message.QueryStartResponse;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
import org.apache.ignite.internal.sql.engine.metadata.MappingService;
import org.apache.ignite.internal.sql.engine.metadata.MappingServiceImpl;
import org.apache.ignite.internal.sql.engine.metadata.RemoteException;
import org.apache.ignite.internal.sql.engine.prepare.DdlPlan;
import org.apache.ignite.internal.sql.engine.prepare.ExplainPlan;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
import org.apache.ignite.internal.sql.engine.prepare.FragmentPlan;
import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.TopologyEventHandler;
import org.apache.ignite.network.TopologyService;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.class */
public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEventHandler {
    private static final IgniteLogger LOG;
    private static final SqlQueryMessagesFactory FACTORY;
    private final MessageService msgSrvc;
    private final String locNodeId;
    private final SqlSchemaManager sqlSchemaManager;
    private final QueryTaskExecutor taskExecutor;
    private final MappingService mappingSrvc;
    private final ExchangeService exchangeSrvc;
    private final RowHandler<RowT> handler;
    private final DdlCommandHandler ddlCmdHnd;
    private final ImplementorFactory<RowT> implementorFactory;
    private final Map<UUID, ExecutionServiceImpl<RowT>.DistributedQueryManager> queryManagerMap = new ConcurrentHashMap();
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl$DistributedQueryManager.class */
    public class DistributedQueryManager {
        private final BaseQueryContext ctx;
        private final CompletableFuture<AsyncRootNode<RowT, List<Object>>> root;
        static final /* synthetic */ boolean $assertionsDisabled;
        private final CompletableFuture<Void> cancelFut = new CompletableFuture<>();
        private final AtomicBoolean cancelled = new AtomicBoolean();
        private final Map<RemoteFragmentKey, CompletableFuture<Void>> remoteFragmentInitCompletion = new ConcurrentHashMap();
        private final Queue<AbstractNode<RowT>> localFragments = new LinkedBlockingQueue();
        private volatile Long rootFragmentId = null;

        private DistributedQueryManager(BaseQueryContext baseQueryContext) {
            this.ctx = baseQueryContext;
            CompletableFuture<AsyncRootNode<RowT, List<Object>>> completableFuture = new CompletableFuture<>();
            completableFuture.exceptionally(th -> {
                close(true);
                return null;
            });
            this.root = completableFuture;
        }

        private List<AbstractNode<?>> localFragments() {
            return List.copyOf(this.localFragments);
        }

        private void sendFragment(String str, Fragment fragment, FragmentDescription fragmentDescription) throws IgniteInternalCheckedException {
            QueryStartRequest build = ExecutionServiceImpl.FACTORY.queryStartRequest().queryId(this.ctx.queryId()).fragmentId(fragment.fragmentId()).schema(this.ctx.schemaName()).root(fragment.serialized()).fragmentDescription(fragmentDescription).parameters(this.ctx.parameters()).build();
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            this.remoteFragmentInitCompletion.put(new RemoteFragmentKey(str, fragment.fragmentId()), completableFuture);
            try {
                ExecutionServiceImpl.this.msgSrvc.send(str, build);
            } catch (Exception e) {
                completableFuture.complete(null);
                if (fragment.rootFragment()) {
                    this.root.completeExceptionally(e);
                }
                throw e;
            }
        }

        private void acknowledgeFragment(String str, long j, @Nullable Throwable th) {
            if (th != null) {
                Long l = this.rootFragmentId;
                if (l == null || j != l.longValue()) {
                    this.root.thenAccept(asyncRootNode -> {
                        asyncRootNode.onError(th);
                        close(true);
                    });
                } else {
                    this.root.completeExceptionally(th);
                }
            }
            this.remoteFragmentInitCompletion.get(new RemoteFragmentKey(str, j)).complete(null);
        }

        private void onError(RemoteException remoteException) {
            this.root.thenAccept(asyncRootNode -> {
                asyncRootNode.onError(remoteException);
                close(true);
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void onNodeLeft(String str) {
            this.remoteFragmentInitCompletion.entrySet().stream().filter(entry -> {
                return str.equals(((RemoteFragmentKey) entry.getKey()).nodeId());
            }).forEach(entry2 -> {
                ((CompletableFuture) entry2.getValue()).completeExceptionally(new IgniteInternalException("asddd"));
            });
        }

        private void executeFragment(FragmentPlan fragmentPlan, ExecutionContext<RowT> executionContext) {
            String originatingNodeId = executionContext.originatingNodeId();
            AbstractNode<RowT> abstractNode = (AbstractNode) ExecutionServiceImpl.this.implementorFactory.create(executionContext).go(fragmentPlan.root());
            this.localFragments.add(abstractNode);
            if (!(abstractNode instanceof Outbox)) {
                Function resultTypeConverter = TypeUtils.resultTypeConverter(executionContext, fragmentPlan.root().getRowType());
                AsyncRootNode<RowT, List<Object>> asyncRootNode = new AsyncRootNode<>(abstractNode, obj -> {
                    Object apply = resultTypeConverter.apply(obj);
                    int columnCount = executionContext.rowHandler().columnCount(apply);
                    ArrayList arrayList = new ArrayList(columnCount);
                    for (int i = 0; i < columnCount; i++) {
                        arrayList.add(executionContext.rowHandler().get(i, apply));
                    }
                    return arrayList;
                });
                abstractNode.onRegister(asyncRootNode);
                this.root.complete(asyncRootNode);
            }
            try {
                ExecutionServiceImpl.this.msgSrvc.send(originatingNodeId, ExecutionServiceImpl.FACTORY.queryStartResponse().queryId(executionContext.queryId()).fragmentId(executionContext.fragmentId()).build());
                if (abstractNode instanceof Outbox) {
                    ((Outbox) abstractNode).init();
                }
            } catch (IgniteInternalCheckedException e) {
                throw new IgniteInternalException("Failed to send reply. [nodeId=" + originatingNodeId + "]", e);
            }
        }

        private ExecutionContext<RowT> createContext(String str, FragmentDescription fragmentDescription) {
            return new ExecutionContext<>(this.ctx, ExecutionServiceImpl.this.taskExecutor, this.ctx.queryId(), ExecutionServiceImpl.this.locNodeId, str, fragmentDescription, ExecutionServiceImpl.this.handler, Commons.parametersMap(this.ctx.parameters()));
        }

        private void submitFragment(String str, String str2, FragmentDescription fragmentDescription) {
            try {
                executeFragment((FragmentPlan) ExecutionServiceImpl.this.prepareFragment(str2), createContext(str, fragmentDescription));
            } catch (Throwable th) {
                ExecutionServiceImpl.LOG.error("Failed to start query fragment", th);
                try {
                    ExecutionServiceImpl.this.msgSrvc.send(str, ExecutionServiceImpl.FACTORY.queryStartResponse().queryId(this.ctx.queryId()).fragmentId(fragmentDescription.fragmentId()).error(th).build());
                } catch (Exception e) {
                    ExecutionServiceImpl.LOG.error("Error occurred during send error message", e);
                    close(true);
                }
            }
        }

        private AsyncCursor<List<Object>> execute(MultiStepPlan multiStepPlan) {
            ExecutionServiceImpl.this.taskExecutor.execute(() -> {
                multiStepPlan.init(ExecutionServiceImpl.this.mappingSrvc, new MappingQueryContext(ExecutionServiceImpl.this.locNodeId));
                List<Fragment> fragments = multiStepPlan.fragments();
                if (!$assertionsDisabled && (CollectionUtils.nullOrEmpty(fragments) || !fragments.get(0).rootFragment())) {
                    throw new AssertionError(fragments);
                }
                try {
                    for (Fragment fragment : fragments) {
                        if (fragment.rootFragment()) {
                            if (!$assertionsDisabled && this.rootFragmentId != null) {
                                throw new AssertionError();
                            }
                            this.rootFragmentId = Long.valueOf(fragment.fragmentId());
                        }
                        FragmentDescription fragmentDescription = new FragmentDescription(fragment.fragmentId(), multiStepPlan.mapping(fragment), multiStepPlan.target(fragment), multiStepPlan.remotes(fragment));
                        Iterator<String> it = fragmentDescription.nodeIds().iterator();
                        while (it.hasNext()) {
                            sendFragment(it.next(), fragment, fragmentDescription);
                        }
                    }
                } catch (Throwable th) {
                    this.root.thenAccept(asyncRootNode -> {
                        asyncRootNode.onError(th);
                    });
                }
            });
            return new AsyncCursor<List<Object>>() { // from class: org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImpl.DistributedQueryManager.1
                @Override // org.apache.ignite.internal.sql.engine.AsyncCursor
                public CompletionStage<AsyncCursor.BatchedResult<List<Object>>> requestNextAsync(int i) {
                    return DistributedQueryManager.this.root.thenCompose(asyncRootNode -> {
                        CompletionStage requestNextAsync = asyncRootNode.requestNextAsync(i);
                        requestNextAsync.thenAccept(batchedResult -> {
                            if (batchedResult.hasMore()) {
                                return;
                            }
                            DistributedQueryManager.this.close(false);
                        });
                        return requestNextAsync;
                    });
                }

                @Override // org.apache.ignite.internal.sql.engine.AsyncCursor
                public CompletableFuture<Void> closeAsync() {
                    return DistributedQueryManager.this.root.thenCompose(asyncRootNode -> {
                        return DistributedQueryManager.this.close(false);
                    });
                }
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public CompletableFuture<Void> close(boolean z) {
            if (!this.cancelled.compareAndSet(false, true)) {
                return this.cancelFut.thenApply(Function.identity());
            }
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.thenCompose(r6 -> {
                return !this.root.completeExceptionally(new ExecutionCancelledException()) ? z ? this.root.thenAccept(asyncRootNode -> {
                    asyncRootNode.onError(new ExecutionCancelledException());
                }) : this.root.thenCompose((v0) -> {
                    return v0.closeAsync();
                }) : CompletableFuture.completedFuture(null);
            }).thenCompose(r7 -> {
                HashMap hashMap = new HashMap();
                for (Map.Entry<RemoteFragmentKey, CompletableFuture<Void>> entry : this.remoteFragmentInitCompletion.entrySet()) {
                    ((List) hashMap.computeIfAbsent(entry.getKey().nodeId(), str -> {
                        return new ArrayList();
                    })).add(entry.getValue());
                }
                ArrayList arrayList = new ArrayList();
                for (Map.Entry entry2 : hashMap.entrySet()) {
                    String str2 = (String) entry2.getKey();
                    if (ExecutionServiceImpl.this.exchangeSrvc.alive(str2)) {
                        arrayList.add(CompletableFuture.allOf((CompletableFuture[]) ((List) entry2.getValue()).toArray(new CompletableFuture[0])).handle((r7, th) -> {
                            try {
                                ExecutionServiceImpl.this.exchangeSrvc.closeQuery(str2, this.ctx.queryId());
                                return null;
                            } catch (IgniteInternalCheckedException e) {
                                throw new IgniteInternalException("Failed to send cancel message. [nodeId=" + str2 + "]", e);
                            }
                        }));
                    }
                }
                if (z) {
                    ExecutionCancelledException executionCancelledException = new ExecutionCancelledException();
                    for (AbstractNode<RowT> abstractNode : this.localFragments) {
                        ExecutionContext<RowT> context = abstractNode.context();
                        ExecutionContext.RunnableX runnableX = () -> {
                            abstractNode.onError(executionCancelledException);
                        };
                        Objects.requireNonNull(abstractNode);
                        context.execute(runnableX, abstractNode::onError);
                    }
                }
                CompletableFuture<Void> allOf = CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0]));
                return allOf.thenCombine((CompletionStage) allOf.thenRun(() -> {
                    ExecutionServiceImpl.this.queryManagerMap.remove(this.ctx.queryId());
                    this.cancelFut.complete(null);
                }), (r2, r3) -> {
                    return null;
                });
            });
            completableFuture.completeAsync(() -> {
                return null;
            }, ExecutionServiceImpl.this.taskExecutor);
            return this.cancelFut.thenApply(Function.identity());
        }

        static {
            $assertionsDisabled = !ExecutionServiceImpl.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl$ImplementorFactory.class */
    public interface ImplementorFactory<RowT> {
        LogicalRelImplementor<RowT> create(ExecutionContext<RowT> executionContext);
    }

    public static <RowT> ExecutionServiceImpl<RowT> create(TopologyService topologyService, MessageService messageService, SqlSchemaManager sqlSchemaManager, TableManager tableManager, QueryTaskExecutor queryTaskExecutor, RowHandler<RowT> rowHandler, MailboxRegistry mailboxRegistry, ExchangeService exchangeService, DataStorageManager dataStorageManager) {
        return new ExecutionServiceImpl<>(topologyService.localMember().id(), messageService, new MappingServiceImpl(topologyService), sqlSchemaManager, new DdlCommandHandler(tableManager, dataStorageManager), queryTaskExecutor, rowHandler, exchangeService, executionContext -> {
            return new LogicalRelImplementor(executionContext, i -> {
                return Objects::hashCode;
            }, mailboxRegistry, exchangeService);
        });
    }

    public ExecutionServiceImpl(String str, MessageService messageService, MappingService mappingService, SqlSchemaManager sqlSchemaManager, DdlCommandHandler ddlCommandHandler, QueryTaskExecutor queryTaskExecutor, RowHandler<RowT> rowHandler, ExchangeService exchangeService, ImplementorFactory<RowT> implementorFactory) {
        this.locNodeId = str;
        this.handler = rowHandler;
        this.msgSrvc = messageService;
        this.mappingSrvc = mappingService;
        this.sqlSchemaManager = sqlSchemaManager;
        this.taskExecutor = queryTaskExecutor;
        this.exchangeSrvc = exchangeService;
        this.ddlCmdHnd = ddlCommandHandler;
        this.implementorFactory = implementorFactory;
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.LifecycleAware
    public void start() {
        this.msgSrvc.register((str, networkMessage) -> {
            onMessage(str, (QueryStartRequest) networkMessage);
        }, (short) 0);
        this.msgSrvc.register((str2, networkMessage2) -> {
            onMessage(str2, (QueryStartResponse) networkMessage2);
        }, (short) 1);
        this.msgSrvc.register((str3, networkMessage3) -> {
            onMessage(str3, (QueryCloseMessage) networkMessage3);
        }, (short) 6);
        this.msgSrvc.register((str4, networkMessage4) -> {
            onMessage(str4, (ErrorMessage) networkMessage4);
        }, (short) 2);
    }

    private AsyncCursor<List<Object>> executeQuery(BaseQueryContext baseQueryContext, MultiStepPlan multiStepPlan) {
        Map<UUID, ExecutionServiceImpl<RowT>.DistributedQueryManager> map = this.queryManagerMap;
        UUID queryId = baseQueryContext.queryId();
        ExecutionServiceImpl<RowT>.DistributedQueryManager distributedQueryManager = new DistributedQueryManager(baseQueryContext);
        ExecutionServiceImpl<RowT>.DistributedQueryManager put = map.put(queryId, distributedQueryManager);
        if ($assertionsDisabled || put == null) {
            return distributedQueryManager.execute(multiStepPlan);
        }
        throw new AssertionError();
    }

    private BaseQueryContext createQueryContext(UUID uuid, @Nullable String str, Object[] objArr) {
        return BaseQueryContext.builder().queryId(uuid).parameters(objArr).frameworkConfig(Frameworks.newConfigBuilder(Commons.FRAMEWORK_CONFIG).defaultSchema(this.sqlSchemaManager.schema(str)).build()).logger(LOG).build();
    }

    private QueryPlan prepareFragment(String str) {
        return new FragmentPlan(RelJsonReader.fromJson(this.sqlSchemaManager, str));
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.ExecutionService
    public AsyncCursor<List<Object>> executePlan(QueryPlan queryPlan, BaseQueryContext baseQueryContext) {
        switch (queryPlan.type()) {
            case DML:
            case QUERY:
                return executeQuery(baseQueryContext, (MultiStepPlan) queryPlan);
            case EXPLAIN:
                return executeExplain((ExplainPlan) queryPlan);
            case DDL:
                return executeDdl((DdlPlan) queryPlan);
            default:
                throw new AssertionError("Unexpected plan type: " + queryPlan);
        }
    }

    public CompletionStage<?> cancel(UUID uuid) {
        ExecutionServiceImpl<RowT>.DistributedQueryManager distributedQueryManager = this.queryManagerMap.get(uuid);
        return distributedQueryManager == null ? CompletableFuture.completedFuture(null) : distributedQueryManager.close(true);
    }

    private AsyncCursor<List<Object>> executeDdl(DdlPlan ddlPlan) {
        try {
            return new AsyncWrapper(Collections.singletonList(Collections.singletonList(Boolean.valueOf(this.ddlCmdHnd.handle(ddlPlan.command())))).iterator());
        } catch (IgniteInternalCheckedException e) {
            throw new IgniteInternalException("Failed to execute DDL statement [stmt=, err=" + e.getMessage() + "]", e);
        }
    }

    private AsyncCursor<List<Object>> executeExplain(ExplainPlan explainPlan) {
        return new AsyncWrapper(List.of(List.of(explainPlan.plan())).iterator());
    }

    private void onMessage(String str, QueryStartRequest queryStartRequest) {
        if (!$assertionsDisabled && (str == null || queryStartRequest == null)) {
            throw new AssertionError();
        }
        this.queryManagerMap.computeIfAbsent(queryStartRequest.queryId(), uuid -> {
            return new DistributedQueryManager(createQueryContext(uuid, queryStartRequest.schema(), queryStartRequest.parameters()));
        }).submitFragment(str, queryStartRequest.root(), queryStartRequest.fragmentDescription());
    }

    private void onMessage(String str, QueryStartResponse queryStartResponse) {
        if (!$assertionsDisabled && (str == null || queryStartResponse == null)) {
            throw new AssertionError();
        }
        ExecutionServiceImpl<RowT>.DistributedQueryManager distributedQueryManager = this.queryManagerMap.get(queryStartResponse.queryId());
        if (distributedQueryManager != null) {
            distributedQueryManager.acknowledgeFragment(str, queryStartResponse.fragmentId(), queryStartResponse.error());
        }
    }

    private void onMessage(String str, ErrorMessage errorMessage) {
        if (!$assertionsDisabled && (str == null || errorMessage == null)) {
            throw new AssertionError();
        }
        ExecutionServiceImpl<RowT>.DistributedQueryManager distributedQueryManager = this.queryManagerMap.get(errorMessage.queryId());
        if (distributedQueryManager != null) {
            distributedQueryManager.onError(new RemoteException(str, errorMessage.queryId(), errorMessage.fragmentId(), errorMessage.error()));
        }
    }

    private void onMessage(String str, QueryCloseMessage queryCloseMessage) {
        if (!$assertionsDisabled && (str == null || queryCloseMessage == null)) {
            throw new AssertionError();
        }
        ExecutionServiceImpl<RowT>.DistributedQueryManager distributedQueryManager = this.queryManagerMap.get(queryCloseMessage.queryId());
        if (distributedQueryManager != null) {
            distributedQueryManager.close(true);
        }
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.LifecycleAware
    public void stop() throws Exception {
        CompletableFuture.allOf((CompletableFuture[]) this.queryManagerMap.values().stream().filter(distributedQueryManager -> {
            return distributedQueryManager.rootFragmentId != null;
        }).map(distributedQueryManager2 -> {
            return distributedQueryManager2.close(true);
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).join();
    }

    public void onAppeared(ClusterNode clusterNode) {
    }

    public void onDisappeared(ClusterNode clusterNode) {
        this.queryManagerMap.values().forEach(distributedQueryManager -> {
            distributedQueryManager.onNodeLeft(clusterNode.id());
        });
    }

    public List<AbstractNode<?>> localFragments(UUID uuid) {
        ExecutionServiceImpl<RowT>.DistributedQueryManager distributedQueryManager = this.queryManagerMap.get(uuid);
        return distributedQueryManager == null ? List.of() : distributedQueryManager.localFragments();
    }

    static {
        $assertionsDisabled = !ExecutionServiceImpl.class.desiredAssertionStatus();
        LOG = IgniteLogger.forClass(ExecutionServiceImpl.class);
        FACTORY = new SqlQueryMessagesFactory();
    }
}
