package io.trino.operator.exchange;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.slice.XxHash64;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.operator.BucketPartitionFunction;
import io.trino.operator.InterpretedHashGenerator;
import io.trino.operator.PartitionFunction;
import io.trino.operator.PrecomputedHashGenerator;
import io.trino.operator.output.SkewedPartitionRebalancer;
import io.trino.spi.Page;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeOperators;
import io.trino.sql.planner.MergePartitioningHandle;
import io.trino.sql.planner.NodePartitioningManager;
import io.trino.sql.planner.PartitioningHandle;
import io.trino.sql.planner.SystemPartitioningHandle;
import java.io.Closeable;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.IntStream;

@ThreadSafe
/* loaded from: input_file:io/trino/operator/exchange/LocalExchange.class */
public class LocalExchange {
    public static final int SCALE_WRITERS_MAX_PARTITIONS_PER_WRITER = 128;
    private final Supplier<LocalExchanger> exchangerSupplier;
    private final List<LocalExchangeSource> sources;

    @GuardedBy("this")
    private boolean allSourcesFinished;

    @GuardedBy("this")
    private boolean noMoreSinkFactories;

    @GuardedBy("this")
    private final Set<LocalExchangeSinkFactory> openSinkFactories = new HashSet();

    @GuardedBy("this")
    private final Set<LocalExchangeSink> sinks = new HashSet();

    @GuardedBy("this")
    private int nextSourceIndex;

    @ThreadSafe
    /* loaded from: input_file:io/trino/operator/exchange/LocalExchange$LocalExchangeSinkFactory.class */
    public static class LocalExchangeSinkFactory implements Closeable {
        private final LocalExchange exchange;

        private LocalExchangeSinkFactory(LocalExchange localExchange) {
            this.exchange = (LocalExchange) Objects.requireNonNull(localExchange, "exchange is null");
        }

        public LocalExchangeSink createSink() {
            return this.exchange.createSink(this);
        }

        public LocalExchangeSinkFactory duplicate() {
            return this.exchange.createSinkFactory();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.exchange.sinkFactoryClosed(this);
        }

        public void noMoreSinkFactories() {
            this.exchange.noMoreSinkFactories();
        }
    }

    public LocalExchange(NodePartitioningManager nodePartitioningManager, Session session, int i, PartitioningHandle partitioningHandle, List<Integer> list, List<Type> list2, Optional<Integer> optional, DataSize dataSize, TypeOperators typeOperators, DataSize dataSize2, Supplier<Long> supplier) {
        int computeBufferCount = computeBufferCount(partitioningHandle, i, list);
        if (partitioningHandle.equals(SystemPartitioningHandle.SINGLE_DISTRIBUTION) || partitioningHandle.equals(SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION)) {
            LocalExchangeMemoryManager localExchangeMemoryManager = new LocalExchangeMemoryManager(dataSize.toBytes());
            this.sources = (List) IntStream.range(0, computeBufferCount).mapToObj(i2 -> {
                return new LocalExchangeSource(localExchangeMemoryManager, localExchangeSource -> {
                    checkAllSourcesFinished();
                });
            }).collect(ImmutableList.toImmutableList());
            this.exchangerSupplier = () -> {
                return new RandomExchanger(asPageConsumers(this.sources), localExchangeMemoryManager);
            };
            return;
        }
        if (partitioningHandle.equals(SystemPartitioningHandle.FIXED_PASSTHROUGH_DISTRIBUTION)) {
            List list3 = (List) IntStream.range(0, computeBufferCount).mapToObj(i3 -> {
                return new LocalExchangeMemoryManager(dataSize.toBytes() / computeBufferCount);
            }).collect(ImmutableList.toImmutableList());
            this.sources = (List) list3.stream().map(localExchangeMemoryManager2 -> {
                return new LocalExchangeSource(localExchangeMemoryManager2, localExchangeSource -> {
                    checkAllSourcesFinished();
                });
            }).collect(ImmutableList.toImmutableList());
            AtomicInteger atomicInteger = new AtomicInteger();
            this.exchangerSupplier = () -> {
                int andIncrement = atomicInteger.getAndIncrement();
                Preconditions.checkState(andIncrement < this.sources.size(), "no more sources");
                return new PassthroughExchanger(this.sources.get(andIncrement), (LocalExchangeMemoryManager) list3.get(andIncrement));
            };
            return;
        }
        if (partitioningHandle.equals(SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION)) {
            LocalExchangeMemoryManager localExchangeMemoryManager3 = new LocalExchangeMemoryManager(dataSize.toBytes());
            this.sources = (List) IntStream.range(0, computeBufferCount).mapToObj(i4 -> {
                return new LocalExchangeSource(localExchangeMemoryManager3, localExchangeSource -> {
                    checkAllSourcesFinished();
                });
            }).collect(ImmutableList.toImmutableList());
            AtomicLong atomicLong = new AtomicLong(0L);
            this.exchangerSupplier = () -> {
                return new ScaleWriterExchanger(asPageConsumers(this.sources), localExchangeMemoryManager3, dataSize.toBytes(), atomicLong, dataSize2, supplier, SystemSessionProperties.getQueryMaxMemoryPerNode(session).toBytes());
            };
            return;
        }
        if (PartitioningHandle.isScaledWriterHashDistribution(partitioningHandle)) {
            int i5 = computeBufferCount * SCALE_WRITERS_MAX_PARTITIONS_PER_WRITER;
            SkewedPartitionRebalancer skewedPartitionRebalancer = new SkewedPartitionRebalancer(i5, computeBufferCount, 1, dataSize2.toBytes(), SystemSessionProperties.getSkewedPartitionMinDataProcessedRebalanceThreshold(session).toBytes());
            LocalExchangeMemoryManager localExchangeMemoryManager4 = new LocalExchangeMemoryManager(dataSize.toBytes());
            this.sources = (List) IntStream.range(0, computeBufferCount).mapToObj(i6 -> {
                return new LocalExchangeSource(localExchangeMemoryManager4, localExchangeSource -> {
                    checkAllSourcesFinished();
                });
            }).collect(ImmutableList.toImmutableList());
            this.exchangerSupplier = () -> {
                return new ScaleWriterPartitioningExchanger(asPageConsumers(this.sources), localExchangeMemoryManager4, dataSize.toBytes(), createPartitionPagePreparer(partitioningHandle, list), createPartitionFunction(nodePartitioningManager, session, typeOperators, partitioningHandle, i5, list, list2, optional), i5, skewedPartitionRebalancer, supplier, SystemSessionProperties.getQueryMaxMemoryPerNode(session).toBytes());
            };
            return;
        }
        if (!partitioningHandle.equals(SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION) && !partitioningHandle.getCatalogHandle().isPresent() && !(partitioningHandle.getConnectorHandle() instanceof MergePartitioningHandle)) {
            throw new IllegalArgumentException("Unsupported local exchange partitioning " + String.valueOf(partitioningHandle));
        }
        LocalExchangeMemoryManager localExchangeMemoryManager5 = new LocalExchangeMemoryManager(dataSize.toBytes());
        this.sources = (List) IntStream.range(0, computeBufferCount).mapToObj(i7 -> {
            return new LocalExchangeSource(localExchangeMemoryManager5, localExchangeSource -> {
                checkAllSourcesFinished();
            });
        }).collect(ImmutableList.toImmutableList());
        this.exchangerSupplier = () -> {
            return new PartitioningExchanger(asPageConsumers(this.sources), localExchangeMemoryManager5, createPartitionPagePreparer(partitioningHandle, list), createPartitionFunction(nodePartitioningManager, session, typeOperators, partitioningHandle, computeBufferCount, list, list2, optional));
        };
    }

    public int getBufferCount() {
        return this.sources.size();
    }

    public synchronized LocalExchangeSinkFactory createSinkFactory() {
        Preconditions.checkState(!this.noMoreSinkFactories, "No more sink factories already set");
        LocalExchangeSinkFactory localExchangeSinkFactory = new LocalExchangeSinkFactory(this);
        this.openSinkFactories.add(localExchangeSinkFactory);
        return localExchangeSinkFactory;
    }

    public synchronized LocalExchangeSource getNextSource() {
        Preconditions.checkState(this.nextSourceIndex < this.sources.size(), "All operators already created");
        LocalExchangeSource localExchangeSource = this.sources.get(this.nextSourceIndex);
        this.nextSourceIndex++;
        return localExchangeSource;
    }

    private static Function<Page, Page> createPartitionPagePreparer(PartitioningHandle partitioningHandle, List<Integer> list) {
        Function<Page, Page> function;
        if (partitioningHandle.getConnectorHandle() instanceof SystemPartitioningHandle) {
            function = Function.identity();
        } else {
            int[] array = Ints.toArray(list);
            function = page -> {
                return page.getColumns(array);
            };
        }
        return function;
    }

    private static PartitionFunction createPartitionFunction(NodePartitioningManager nodePartitioningManager, Session session, TypeOperators typeOperators, PartitioningHandle partitioningHandle, int i, List<Integer> list, List<Type> list2, Optional<Integer> optional) {
        Preconditions.checkArgument(Integer.bitCount(i) == 1, "partitionCount must be a power of 2");
        if (isSystemPartitioning(partitioningHandle)) {
            return new LocalPartitionGenerator(optional.isPresent() ? new PrecomputedHashGenerator(optional.get().intValue()) : InterpretedHashGenerator.createChannelsHashGenerator(list2, Ints.toArray(list), typeOperators), i);
        }
        int bucketCount = getBucketCount(session, nodePartitioningManager, partitioningHandle);
        int[] iArr = new int[bucketCount];
        for (int i2 = 0; i2 < bucketCount; i2++) {
            iArr[i2] = ((int) XxHash64.hash(Long.reverse(i2))) & (i - 1);
        }
        ConnectorPartitioningHandle connectorHandle = partitioningHandle.getConnectorHandle();
        return connectorHandle instanceof MergePartitioningHandle ? ((MergePartitioningHandle) connectorHandle).getPartitionFunction((partitioningScheme, list3) -> {
            return nodePartitioningManager.getPartitionFunction(session, partitioningScheme, list3, iArr);
        }, list2, iArr) : new BucketPartitionFunction(nodePartitioningManager.getBucketFunction(session, partitioningHandle, list2, bucketCount), iArr);
    }

    public static int getBucketCount(Session session, NodePartitioningManager nodePartitioningManager, PartitioningHandle partitioningHandle) {
        return partitioningHandle.getConnectorHandle() instanceof MergePartitioningHandle ? nodePartitioningManager.getNodePartitioningMap(session, partitioningHandle).getBucketToPartition().length : nodePartitioningManager.getBucketNodeMap(session, partitioningHandle).getBucketCount();
    }

    private static boolean isSystemPartitioning(PartitioningHandle partitioningHandle) {
        return partitioningHandle.getConnectorHandle() instanceof SystemPartitioningHandle;
    }

    private void checkAllSourcesFinished() {
        ImmutableList copyOf;
        checkNotHoldsLock(this);
        if (this.sources.stream().allMatch((v0) -> {
            return v0.isFinished();
        })) {
            synchronized (this) {
                this.allSourcesFinished = true;
                copyOf = ImmutableList.copyOf(this.sinks);
                this.sinks.clear();
            }
            copyOf.forEach((v0) -> {
                v0.finish();
            });
            checkAllSinksComplete();
        }
    }

    private LocalExchangeSink createSink(LocalExchangeSinkFactory localExchangeSinkFactory) {
        checkNotHoldsLock(this);
        synchronized (this) {
            Preconditions.checkState(this.openSinkFactories.contains(localExchangeSinkFactory), "Factory is already closed");
            if (this.allSourcesFinished) {
                return LocalExchangeSink.finishedLocalExchangeSink();
            }
            LocalExchangeSink localExchangeSink = new LocalExchangeSink(this.exchangerSupplier.get(), this::sinkFinished);
            this.sinks.add(localExchangeSink);
            return localExchangeSink;
        }
    }

    private void sinkFinished(LocalExchangeSink localExchangeSink) {
        checkNotHoldsLock(this);
        synchronized (this) {
            this.sinks.remove(localExchangeSink);
        }
        checkAllSinksComplete();
    }

    private void noMoreSinkFactories() {
        checkNotHoldsLock(this);
        synchronized (this) {
            this.noMoreSinkFactories = true;
        }
        checkAllSinksComplete();
    }

    private void sinkFactoryClosed(LocalExchangeSinkFactory localExchangeSinkFactory) {
        checkNotHoldsLock(this);
        synchronized (this) {
            this.openSinkFactories.remove(localExchangeSinkFactory);
        }
        checkAllSinksComplete();
    }

    private void checkAllSinksComplete() {
        checkNotHoldsLock(this);
        synchronized (this) {
            if (this.noMoreSinkFactories && this.openSinkFactories.isEmpty() && this.sinks.isEmpty()) {
                this.sources.forEach((v0) -> {
                    v0.finish();
                });
            }
        }
    }

    @VisibleForTesting
    LocalExchangeSource getSource(int i) {
        return this.sources.get(i);
    }

    private static void checkNotHoldsLock(Object obj) {
        Preconditions.checkState(!Thread.holdsLock(obj), "Cannot execute this method while holding a lock");
    }

    private static int computeBufferCount(PartitioningHandle partitioningHandle, int i, List<Integer> list) {
        int i2;
        if (partitioningHandle.equals(SystemPartitioningHandle.SINGLE_DISTRIBUTION)) {
            i2 = 1;
            Preconditions.checkArgument(list.isEmpty(), "Gather exchange must not have partition channels");
        } else if (partitioningHandle.equals(SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION)) {
            i2 = i;
            Preconditions.checkArgument(list.isEmpty(), "Broadcast exchange must not have partition channels");
        } else if (partitioningHandle.equals(SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION)) {
            i2 = i;
            Preconditions.checkArgument(list.isEmpty(), "Arbitrary exchange must not have partition channels");
        } else if (partitioningHandle.equals(SystemPartitioningHandle.FIXED_PASSTHROUGH_DISTRIBUTION)) {
            i2 = i;
            Preconditions.checkArgument(list.isEmpty(), "Passthrough exchange must not have partition channels");
        } else if (partitioningHandle.equals(SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION)) {
            i2 = i;
            Preconditions.checkArgument(list.isEmpty(), "Scaled writer exchange must not have partition channels");
        } else if (PartitioningHandle.isScaledWriterHashDistribution(partitioningHandle)) {
            i2 = i;
        } else {
            if (!partitioningHandle.equals(SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION) && !partitioningHandle.getCatalogHandle().isPresent() && !(partitioningHandle.getConnectorHandle() instanceof MergePartitioningHandle)) {
                throw new IllegalArgumentException("Unsupported local exchange partitioning " + String.valueOf(partitioningHandle));
            }
            i2 = i;
        }
        return i2;
    }

    private static List<Consumer<Page>> asPageConsumers(List<LocalExchangeSource> list) {
        return (List) list.stream().map(localExchangeSource -> {
            Objects.requireNonNull(localExchangeSource);
            return localExchangeSource::addPage;
        }).collect(ImmutableList.toImmutableList());
    }
}
