package io.trino.execution.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.concurrent.MoreFutures;
import io.trino.exchange.SpoolingExchangeInput;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.scheduler.SplitAssigner;
import io.trino.metadata.Split;
import io.trino.operator.ExchangeOperator;
import io.trino.spi.QueryId;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceHandleSource;
import io.trino.split.RemoteSplit;
import io.trino.split.SplitSource;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNodeId;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
/* loaded from: input_file:io/trino/execution/scheduler/EventDrivenTaskSource.class */
class EventDrivenTaskSource implements Closeable {
    private final QueryId queryId;
    private final TableExecuteContextManager tableExecuteContextManager;
    private final Map<PlanFragmentId, Exchange> sourceExchanges;
    private final SetMultimap<PlanNodeId, PlanFragmentId> remoteSources;
    private final Supplier<Map<PlanNodeId, SplitSource>> splitSourceSupplier;
    private final SplitAssigner assigner;
    private final Executor executor;
    private final int splitBatchSize;
    private final long targetExchangeSplitSizeInBytes;
    private final FaultTolerantPartitioningScheme sourcePartitioningScheme;
    private final LongConsumer getSplitTimeRecorder;

    @GuardedBy("this")
    private boolean initialized;

    @GuardedBy("this")
    private List<IdempotentSplitSource> splitSources;

    @GuardedBy("this")
    private ListenableFuture<SplitAssigner.AssignmentResult> future;

    @GuardedBy("this")
    private boolean closed;

    @GuardedBy("this")
    private final Set<PlanFragmentId> completedFragments = new HashSet();

    @GuardedBy("this")
    private final Closer closer = Closer.create();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/EventDrivenTaskSource$CallbackProxyFuture.class */
    public static class CallbackProxyFuture<T> extends ForwardingListenableFuture<T> {
        private final ListenableFuture<T> delegate;

        @GuardedBy("listeners")
        private final Set<SettableFuture<T>> listeners = Sets.newIdentityHashSet();

        private CallbackProxyFuture(ListenableFuture<T> listenableFuture) {
            this.delegate = (ListenableFuture) Objects.requireNonNull(listenableFuture, "delegate is null");
            listenableFuture.addListener(this::propagateIfNecessary, MoreExecutors.directExecutor());
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: delegate, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public ListenableFuture<T> m165delegate() {
            return this.delegate;
        }

        public void addListener(Runnable runnable, Executor executor) {
            throw new UnsupportedOperationException();
        }

        public ListenableFuture<T> addListener() {
            SettableFuture<T> create = SettableFuture.create();
            synchronized (this.listeners) {
                this.listeners.add(create);
            }
            create.addListener(() -> {
                if (create.isCancelled()) {
                    synchronized (this.listeners) {
                        this.listeners.remove(create);
                    }
                }
            }, MoreExecutors.directExecutor());
            propagateIfNecessary();
            return create;
        }

        private void propagateIfNecessary() {
            ImmutableList copyOf;
            if (this.delegate.isDone()) {
                synchronized (this.listeners) {
                    copyOf = ImmutableList.copyOf(this.listeners);
                    this.listeners.clear();
                }
                Iterator it = copyOf.iterator();
                while (it.hasNext()) {
                    ((SettableFuture) it.next()).setFuture(this.delegate);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/EventDrivenTaskSource$ExchangeSplitSource.class */
    public static class ExchangeSplitSource implements SplitSource {
        private final ExchangeSourceHandleSource handleSource;
        private final long targetSplitSizeInBytes;

        private ExchangeSplitSource(ExchangeSourceHandleSource exchangeSourceHandleSource, long j) {
            this.handleSource = (ExchangeSourceHandleSource) Objects.requireNonNull(exchangeSourceHandleSource, "handleSource is null");
            this.targetSplitSizeInBytes = j;
        }

        @Override // io.trino.split.SplitSource
        public CatalogHandle getCatalogHandle() {
            return ExchangeOperator.REMOTE_CATALOG_HANDLE;
        }

        @Override // io.trino.split.SplitSource
        public ListenableFuture<SplitSource.SplitBatch> getNextBatch(int i) {
            return Futures.transform(MoreFutures.toListenableFuture(this.handleSource.getNextBatch()), exchangeSourceHandleBatch -> {
                ListMultimap listMultimap = (ListMultimap) exchangeSourceHandleBatch.handles().stream().collect(ImmutableListMultimap.toImmutableListMultimap((v0) -> {
                    return v0.getPartitionId();
                }, Function.identity()));
                ImmutableList.Builder builder = ImmutableList.builder();
                Iterator it = listMultimap.keySet().iterator();
                while (it.hasNext()) {
                    builder.addAll(createRemoteSplits(listMultimap.get(Integer.valueOf(((Integer) it.next()).intValue()))));
                }
                return new SplitSource.SplitBatch(builder.build(), exchangeSourceHandleBatch.lastBatch());
            }, MoreExecutors.directExecutor());
        }

        private List<Split> createRemoteSplits(List<ExchangeSourceHandle> list) {
            ImmutableList.Builder builder = ImmutableList.builder();
            ImmutableList.Builder builder2 = ImmutableList.builder();
            long j = 0;
            long j2 = 0;
            for (ExchangeSourceHandle exchangeSourceHandle : list) {
                if (j2 > 0 && j + exchangeSourceHandle.getDataSizeInBytes() > this.targetSplitSizeInBytes) {
                    builder.add(createRemoteSplit(builder2.build()));
                    builder2 = ImmutableList.builder();
                    j = 0;
                    j2 = 0;
                }
                builder2.add(exchangeSourceHandle);
                j += exchangeSourceHandle.getDataSizeInBytes();
                j2++;
            }
            if (j2 > 0) {
                builder.add(createRemoteSplit(builder2.build()));
            }
            return builder.build();
        }

        private static Split createRemoteSplit(List<ExchangeSourceHandle> list) {
            return new Split(ExchangeOperator.REMOTE_CATALOG_HANDLE, new RemoteSplit(new SpoolingExchangeInput(list, Optional.empty())));
        }

        @Override // io.trino.split.SplitSource, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.handleSource.close();
        }

        @Override // io.trino.split.SplitSource
        public boolean isFinished() {
            throw new UnsupportedOperationException();
        }

        @Override // io.trino.split.SplitSource
        public Optional<List<Object>> getTableExecuteSplitsInfo() {
            return Optional.empty();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/EventDrivenTaskSource$IdempotentSplitSource.class */
    public static class IdempotentSplitSource implements Closeable {
        private final QueryId queryId;
        private final TableExecuteContextManager tableExecuteContextManager;
        private final PlanNodeId planNodeId;
        private final Optional<PlanFragmentId> sourceFragmentId;
        private final SplitSource splitSource;
        private final int splitBatchSize;
        private final LongConsumer getSplitTimeRecorder;

        @GuardedBy("this")
        private Optional<CallbackProxyFuture<SplitBatchReference>> future = Optional.empty();

        @GuardedBy("this")
        private boolean closed;

        @GuardedBy("this")
        private boolean finished;

        /* loaded from: input_file:io/trino/execution/scheduler/EventDrivenTaskSource$IdempotentSplitSource$SplitBatchReference.class */
        public class SplitBatchReference {
            private final SplitSource.SplitBatch splitBatch;

            public SplitBatchReference(SplitSource.SplitBatch splitBatch) {
                this.splitBatch = (SplitSource.SplitBatch) Objects.requireNonNull(splitBatch, "splitBatch is null");
            }

            public PlanNodeId getPlanNodeId() {
                return IdempotentSplitSource.this.planNodeId;
            }

            public Optional<PlanFragmentId> getSourceFragmentId() {
                return IdempotentSplitSource.this.sourceFragmentId;
            }

            public SplitSource.SplitBatch getSplitBatchAndAdvance() {
                IdempotentSplitSource.this.advance(this.splitBatch.isLastBatch());
                return this.splitBatch;
            }
        }

        private IdempotentSplitSource(QueryId queryId, TableExecuteContextManager tableExecuteContextManager, PlanNodeId planNodeId, Optional<PlanFragmentId> optional, SplitSource splitSource, int i, LongConsumer longConsumer) {
            this.queryId = (QueryId) Objects.requireNonNull(queryId, "queryId is null");
            this.tableExecuteContextManager = (TableExecuteContextManager) Objects.requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
            this.planNodeId = (PlanNodeId) Objects.requireNonNull(planNodeId, "planNodeId is null");
            this.sourceFragmentId = (Optional) Objects.requireNonNull(optional, "sourceFragmentId is null");
            this.splitSource = (SplitSource) Objects.requireNonNull(splitSource, "splitSource is null");
            this.splitBatchSize = i;
            this.getSplitTimeRecorder = (LongConsumer) Objects.requireNonNull(longConsumer, "getSplitTimeRecorder is null");
        }

        public synchronized Optional<ListenableFuture<SplitBatchReference>> getNext() {
            if (this.future.isEmpty() && !this.finished) {
                long nanoTime = System.nanoTime();
                this.future = Optional.of(new CallbackProxyFuture(Futures.transform(this.splitSource.getNextBatch(this.splitBatchSize), splitBatch -> {
                    this.getSplitTimeRecorder.accept(nanoTime);
                    if (splitBatch.isLastBatch()) {
                        this.splitSource.getTableExecuteSplitsInfo().ifPresent(list -> {
                            this.tableExecuteContextManager.getTableExecuteContextForQuery(this.queryId).setSplitsInfo(list);
                        });
                    }
                    return new SplitBatchReference(splitBatch);
                }, MoreExecutors.directExecutor())));
            }
            return this.future.map((v0) -> {
                return v0.addListener();
            });
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public synchronized void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.future.isPresent() && !this.future.get().isDone()) {
                this.future.get().cancel(true);
            }
            this.splitSource.close();
        }

        private synchronized void advance(boolean z) {
            this.finished = z;
            this.future = Optional.empty();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventDrivenTaskSource(QueryId queryId, TableExecuteContextManager tableExecuteContextManager, Map<PlanFragmentId, Exchange> map, SetMultimap<PlanNodeId, PlanFragmentId> setMultimap, Supplier<Map<PlanNodeId, SplitSource>> supplier, SplitAssigner splitAssigner, Executor executor, int i, long j, FaultTolerantPartitioningScheme faultTolerantPartitioningScheme, LongConsumer longConsumer) {
        this.queryId = (QueryId) Objects.requireNonNull(queryId, "queryId is null");
        this.tableExecuteContextManager = (TableExecuteContextManager) Objects.requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
        this.sourceExchanges = ImmutableMap.copyOf((Map) Objects.requireNonNull(map, "sourceExchanges is null"));
        this.remoteSources = ImmutableSetMultimap.copyOf((Multimap) Objects.requireNonNull(setMultimap, "remoteSources is null"));
        this.splitSourceSupplier = (Supplier) Objects.requireNonNull(supplier, "splitSourceSupplier is null");
        this.assigner = (SplitAssigner) Objects.requireNonNull(splitAssigner, "assigner is null");
        this.executor = (Executor) Objects.requireNonNull(executor, "executor is null");
        this.splitBatchSize = i;
        this.targetExchangeSplitSizeInBytes = j;
        this.sourcePartitioningScheme = (FaultTolerantPartitioningScheme) Objects.requireNonNull(faultTolerantPartitioningScheme, "sourcePartitioningScheme is null");
        this.getSplitTimeRecorder = (LongConsumer) Objects.requireNonNull(longConsumer, "getSplitTimeRecorder is null");
    }

    public synchronized ListenableFuture<SplitAssigner.AssignmentResult> process() {
        Preconditions.checkState(!this.closed, "closed");
        Preconditions.checkState(this.future == null || this.future.isDone(), "still in process");
        if (!this.initialized) {
            initialize();
            this.initialized = true;
        }
        this.future = processNext();
        return this.future;
    }

    @GuardedBy("this")
    private void initialize() {
        HashMap hashMap = new HashMap();
        this.remoteSources.forEach((planNodeId, planFragmentId) -> {
            hashMap.put(planFragmentId, planNodeId);
        });
        ImmutableList.Builder builder = ImmutableList.builder();
        for (Map.Entry<PlanFragmentId, Exchange> entry : this.sourceExchanges.entrySet()) {
            PlanFragmentId key = entry.getKey();
            PlanNodeId planNodeId2 = (PlanNodeId) hashMap.get(key);
            Verify.verify(planNodeId2 != null, "remote source not found for fragment: %s", key);
            builder.add((IdempotentSplitSource) this.closer.register(new IdempotentSplitSource(this.queryId, this.tableExecuteContextManager, planNodeId2, Optional.of(key), (ExchangeSplitSource) this.closer.register(new ExchangeSplitSource(this.closer.register(entry.getValue().getSourceHandles()), this.targetExchangeSplitSizeInBytes)), this.splitBatchSize, this.getSplitTimeRecorder)));
        }
        for (Map.Entry<PlanNodeId, SplitSource> entry2 : this.splitSourceSupplier.get().entrySet()) {
            builder.add((IdempotentSplitSource) this.closer.register(new IdempotentSplitSource(this.queryId, this.tableExecuteContextManager, entry2.getKey(), Optional.empty(), (SplitSource) this.closer.register(entry2.getValue()), this.splitBatchSize, this.getSplitTimeRecorder)));
        }
        this.splitSources = builder.build();
    }

    @GuardedBy("this")
    private ListenableFuture<SplitAssigner.AssignmentResult> processNext() {
        List list = (List) this.splitSources.stream().map((v0) -> {
            return v0.getNext();
        }).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).collect(ImmutableList.toImmutableList());
        return list.isEmpty() ? Futures.immediateFuture(this.assigner.finish()) : Futures.transform(MoreFutures.whenAnyCompleteCancelOthers(list), this::process, this.executor);
    }

    private synchronized SplitAssigner.AssignmentResult process(IdempotentSplitSource.SplitBatchReference splitBatchReference) {
        PlanNodeId planNodeId = splitBatchReference.getPlanNodeId();
        Optional<PlanFragmentId> sourceFragmentId = splitBatchReference.getSourceFragmentId();
        SplitSource.SplitBatch splitBatchAndAdvance = splitBatchReference.getSplitBatchAndAdvance();
        boolean z = false;
        if (splitBatchAndAdvance.isLastBatch()) {
            if (sourceFragmentId.isPresent()) {
                this.completedFragments.add(sourceFragmentId.get());
                z = this.completedFragments.containsAll(this.remoteSources.get(planNodeId));
            } else {
                z = true;
            }
        }
        return this.assigner.assign(planNodeId, (ListMultimap) splitBatchAndAdvance.getSplits().stream().collect(ImmutableListMultimap.toImmutableListMultimap(this::getSplitPartition, Function.identity())), z);
    }

    private int getSplitPartition(Split split) {
        ConnectorSplit connectorSplit = split.getConnectorSplit();
        return connectorSplit instanceof RemoteSplit ? ((SpoolingExchangeInput) ((RemoteSplit) connectorSplit).getExchangeInput()).getExchangeSourceHandles().get(0).getPartitionId() : this.sourcePartitioningScheme.getPartition(split);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        try {
            this.closer.close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
