package io.trino.operator.exchange;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.trino.execution.resourcegroups.IndexedPriorityQueue;
import it.unimi.dsi.fastutil.longs.Long2LongMap;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
/* loaded from: input_file:io/trino/operator/exchange/UniformPartitionRebalancer.class */
public class UniformPartitionRebalancer {
    private static final Logger log = Logger.get(UniformPartitionRebalancer.class);
    private static final double SKEWNESS_THRESHOLD = 0.7d;
    private final List<Supplier<Long>> writerPhysicalWrittenBytesSuppliers;
    private final Supplier<Long2LongMap> partitionRowCountsSupplier;
    private final long writerMinSize;
    private final int numberOfWriters;
    private final long rebalanceThresholdMinPhysicalWrittenBytes;
    private final AtomicLongArray writerPhysicalWrittenBytesAtLastRebalance;
    private final PartitionInfo[] partitionInfos;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/operator/exchange/UniformPartitionRebalancer$PartitionIdWithRowCount.class */
    public static final class PartitionIdWithRowCount extends Record {
        private final int id;
        private final long rowCount;

        private PartitionIdWithRowCount(int i, long j) {
            this.id = i;
            this.rowCount = j;
        }

        @Override // java.lang.Record
        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.id == ((PartitionIdWithRowCount) obj).id;
        }

        @Override // java.lang.Record
        public int hashCode() {
            return Objects.hashCode(Integer.valueOf(this.id));
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, PartitionIdWithRowCount.class), PartitionIdWithRowCount.class, "id;rowCount", "FIELD:Lio/trino/operator/exchange/UniformPartitionRebalancer$PartitionIdWithRowCount;->id:I", "FIELD:Lio/trino/operator/exchange/UniformPartitionRebalancer$PartitionIdWithRowCount;->rowCount:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        public int id() {
            return this.id;
        }

        public long rowCount() {
            return this.rowCount;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:io/trino/operator/exchange/UniformPartitionRebalancer$PartitionInfo.class */
    public static class PartitionInfo {
        private final List<Integer> writerAssignments;
        private final AtomicLong physicalWrittenBytesAtLastRebalance = new AtomicLong(0);

        private PartitionInfo(int i) {
            this.writerAssignments = new CopyOnWriteArrayList((Collection) ImmutableList.of(Integer.valueOf(i)));
        }

        private boolean containsWriter(int i) {
            return this.writerAssignments.contains(Integer.valueOf(i));
        }

        private void addWriter(int i) {
            this.writerAssignments.add(Integer.valueOf(i));
        }

        private int getWriterId(int i) {
            return this.writerAssignments.get(Math.floorMod(i, getWriterCount())).intValue();
        }

        private List<Integer> getWriterIds() {
            return ImmutableList.copyOf(this.writerAssignments);
        }

        private int getWriterCount() {
            return this.writerAssignments.size();
        }

        private void resetPhysicalWrittenBytesAtLastRebalance() {
            this.physicalWrittenBytesAtLastRebalance.set(0L);
        }

        private void addToPhysicalWrittenBytesAtLastRebalance(long j) {
            this.physicalWrittenBytesAtLastRebalance.addAndGet(j);
        }

        private long getPhysicalWrittenBytesAtLastRebalancePerWriter() {
            return this.physicalWrittenBytesAtLastRebalance.get() / this.writerAssignments.size();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/operator/exchange/UniformPartitionRebalancer$RebalanceContext.class */
    public class RebalanceContext {
        private final Set<Integer> rebalancedPartitions = new HashSet();
        private final long[] writerPhysicalWrittenBytesSinceLastRebalance;
        private final long[] writerRowCountSinceLastRebalance;
        private final long[] writerEstimatedWrittenBytes;
        private final List<IndexedPriorityQueue<PartitionIdWithRowCount>> writerMaxPartitions;

        private RebalanceContext(List<Long> list, Long2LongMap long2LongMap) {
            this.writerPhysicalWrittenBytesSinceLastRebalance = new long[UniformPartitionRebalancer.this.numberOfWriters];
            this.writerEstimatedWrittenBytes = new long[UniformPartitionRebalancer.this.numberOfWriters];
            for (int i = 0; i < list.size(); i++) {
                long longValue = list.get(i).longValue() - UniformPartitionRebalancer.this.writerPhysicalWrittenBytesAtLastRebalance.get(i);
                this.writerPhysicalWrittenBytesSinceLastRebalance[i] = longValue;
                this.writerEstimatedWrittenBytes[i] = longValue;
            }
            this.writerRowCountSinceLastRebalance = new long[UniformPartitionRebalancer.this.numberOfWriters];
            this.writerMaxPartitions = new ArrayList(UniformPartitionRebalancer.this.numberOfWriters);
            for (int i2 = 0; i2 < UniformPartitionRebalancer.this.numberOfWriters; i2++) {
                this.writerMaxPartitions.add(new IndexedPriorityQueue<>());
            }
            long2LongMap.forEach((l, l2) -> {
                WriterPartitionId deserialize = WriterPartitionId.deserialize(l.longValue());
                long[] jArr = this.writerRowCountSinceLastRebalance;
                int i3 = deserialize.writerId;
                jArr[i3] = jArr[i3] + l2.longValue();
                this.writerMaxPartitions.get(deserialize.writerId).addOrUpdate(new PartitionIdWithRowCount(deserialize.partitionId, l2.longValue()), l2.longValue());
            });
        }

        private List<WriterId> rebalancePartition(WriterId writerId, WriterId writerId2) {
            IndexedPriorityQueue<PartitionIdWithRowCount> indexedPriorityQueue = this.writerMaxPartitions.get(writerId.id);
            ImmutableList.Builder builder = ImmutableList.builder();
            Iterator<PartitionIdWithRowCount> it = indexedPriorityQueue.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                PartitionIdWithRowCount next = it.next();
                PartitionInfo partitionInfo = UniformPartitionRebalancer.this.partitionInfos[next.id];
                if (!isPartitionRebalanced(next.id) && !partitionInfo.containsWriter(writerId2.id)) {
                    indexedPriorityQueue.remove(next);
                    long estimatePartitionWrittenBytesSinceLastRebalance = estimatePartitionWrittenBytesSinceLastRebalance(writerId, next.rowCount);
                    long physicalWrittenBytesAtLastRebalancePerWriter = estimatePartitionWrittenBytesSinceLastRebalance + partitionInfo.getPhysicalWrittenBytesAtLastRebalancePerWriter();
                    if (partitionInfo.getWriterCount() <= UniformPartitionRebalancer.this.numberOfWriters && physicalWrittenBytesAtLastRebalancePerWriter >= UniformPartitionRebalancer.this.writerMinSize) {
                        partitionInfo.addWriter(writerId2.id);
                        this.rebalancedPartitions.add(Integer.valueOf(next.id));
                        updateWriterEstimatedWrittenBytes(writerId2, estimatePartitionWrittenBytesSinceLastRebalance, partitionInfo);
                        Iterator<Integer> it2 = partitionInfo.getWriterIds().iterator();
                        while (it2.hasNext()) {
                            builder.add(new WriterId(it2.next().intValue()));
                        }
                        UniformPartitionRebalancer.log.debug("Scaled partition (%s) to writer %s with writer count %s", new Object[]{Integer.valueOf(next.id), Integer.valueOf(writerId2.id), Integer.valueOf(partitionInfo.getWriterCount())});
                    }
                }
            }
            return builder.build();
        }

        private void updateWriterEstimatedWrittenBytes(WriterId writerId, long j, PartitionInfo partitionInfo) {
            int writerCount = partitionInfo.getWriterCount();
            int i = writerCount - 1;
            Iterator<Integer> it = partitionInfo.getWriterIds().iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                if (intValue != writerId.id) {
                    long[] jArr = this.writerEstimatedWrittenBytes;
                    jArr[intValue] = jArr[intValue] - (j / writerCount);
                }
            }
            long[] jArr2 = this.writerEstimatedWrittenBytes;
            int i2 = writerId.id;
            jArr2[i2] = jArr2[i2] + ((j * i) / writerCount);
        }

        private long getWriterEstimatedWrittenBytes(WriterId writerId) {
            return this.writerEstimatedWrittenBytes[writerId.id];
        }

        private boolean isPartitionRebalanced(int i) {
            return this.rebalancedPartitions.contains(Integer.valueOf(i));
        }

        private long estimatePartitionWrittenBytesSinceLastRebalance(WriterId writerId, long j) {
            if (this.writerRowCountSinceLastRebalance[writerId.id] == 0) {
                return 0L;
            }
            return (this.writerPhysicalWrittenBytesSinceLastRebalance[writerId.id] * j) / this.writerRowCountSinceLastRebalance[writerId.id];
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/operator/exchange/UniformPartitionRebalancer$WriterId.class */
    public static final class WriterId extends Record {
        private final int id;

        private WriterId(int i) {
            this.id = i;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, WriterId.class), WriterId.class, "id", "FIELD:Lio/trino/operator/exchange/UniformPartitionRebalancer$WriterId;->id:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, WriterId.class), WriterId.class, "id", "FIELD:Lio/trino/operator/exchange/UniformPartitionRebalancer$WriterId;->id:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, WriterId.class, Object.class), WriterId.class, "id", "FIELD:Lio/trino/operator/exchange/UniformPartitionRebalancer$WriterId;->id:I").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public int id() {
            return this.id;
        }
    }

    /* loaded from: input_file:io/trino/operator/exchange/UniformPartitionRebalancer$WriterPartitionId.class */
    public static final class WriterPartitionId extends Record {
        private final int writerId;
        private final int partitionId;

        public WriterPartitionId(int i, int i2) {
            this.writerId = i;
            this.partitionId = i2;
        }

        public static WriterPartitionId deserialize(long j) {
            return new WriterPartitionId((int) (j >> 32), (int) j);
        }

        public static long serialize(WriterPartitionId writerPartitionId) {
            return (writerPartitionId.writerId << 32) | (writerPartitionId.partitionId & 4294967295L);
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, WriterPartitionId.class), WriterPartitionId.class, "writerId;partitionId", "FIELD:Lio/trino/operator/exchange/UniformPartitionRebalancer$WriterPartitionId;->writerId:I", "FIELD:Lio/trino/operator/exchange/UniformPartitionRebalancer$WriterPartitionId;->partitionId:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, WriterPartitionId.class), WriterPartitionId.class, "writerId;partitionId", "FIELD:Lio/trino/operator/exchange/UniformPartitionRebalancer$WriterPartitionId;->writerId:I", "FIELD:Lio/trino/operator/exchange/UniformPartitionRebalancer$WriterPartitionId;->partitionId:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, WriterPartitionId.class, Object.class), WriterPartitionId.class, "writerId;partitionId", "FIELD:Lio/trino/operator/exchange/UniformPartitionRebalancer$WriterPartitionId;->writerId:I", "FIELD:Lio/trino/operator/exchange/UniformPartitionRebalancer$WriterPartitionId;->partitionId:I").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public int writerId() {
            return this.writerId;
        }

        public int partitionId() {
            return this.partitionId;
        }
    }

    public UniformPartitionRebalancer(List<Supplier<Long>> list, Supplier<Long2LongMap> supplier, int i, int i2, long j) {
        this.writerPhysicalWrittenBytesSuppliers = (List) Objects.requireNonNull(list, "writerPhysicalWrittenBytesSuppliers is null");
        this.partitionRowCountsSupplier = (Supplier) Objects.requireNonNull(supplier, "partitionRowCountsSupplier is null");
        this.writerMinSize = j;
        this.numberOfWriters = i2;
        this.rebalanceThresholdMinPhysicalWrittenBytes = Math.max(DataSize.of(50L, DataSize.Unit.MEGABYTE).toBytes(), j);
        this.writerPhysicalWrittenBytesAtLastRebalance = new AtomicLongArray(i2);
        this.partitionInfos = new PartitionInfo[i];
        for (int i3 = 0; i3 < i; i3++) {
            this.partitionInfos[i3] = new PartitionInfo(i3 % i2);
        }
    }

    public int getWriterId(int i, int i2) {
        return this.partitionInfos[i].getWriterId(i2);
    }

    @VisibleForTesting
    List<Integer> getWriterIds(int i) {
        return this.partitionInfos[i].getWriterIds();
    }

    public void rebalancePartitions() {
        List<Long> list = (List) this.writerPhysicalWrittenBytesSuppliers.stream().map((v0) -> {
            return v0.get();
        }).collect(ImmutableList.toImmutableList());
        if (getPhysicalWrittenBytesSinceLastRebalance(list) > this.rebalanceThresholdMinPhysicalWrittenBytes) {
            rebalancePartitions(list);
        }
    }

    private int getPhysicalWrittenBytesSinceLastRebalance(List<Long> list) {
        int i = 0;
        for (int i2 = 0; i2 < list.size(); i2++) {
            i = (int) (i + (list.get(i2).longValue() - this.writerPhysicalWrittenBytesAtLastRebalance.get(i2)));
        }
        return i;
    }

    private synchronized void rebalancePartitions(List<Long> list) {
        Long2LongMap long2LongMap = this.partitionRowCountsSupplier.get();
        RebalanceContext rebalanceContext = new RebalanceContext(list, long2LongMap);
        IndexedPriorityQueue indexedPriorityQueue = new IndexedPriorityQueue();
        IndexedPriorityQueue<WriterId> indexedPriorityQueue2 = new IndexedPriorityQueue<>();
        for (int i = 0; i < this.numberOfWriters; i++) {
            WriterId writerId = new WriterId(i);
            indexedPriorityQueue.addOrUpdate(writerId, rebalanceContext.getWriterEstimatedWrittenBytes(writerId));
            indexedPriorityQueue2.addOrUpdate(writerId, Long.MAX_VALUE - rebalanceContext.getWriterEstimatedWrittenBytes(writerId));
        }
        while (true) {
            WriterId writerId2 = (WriterId) indexedPriorityQueue.poll();
            if (writerId2 == null) {
                break;
            }
            List<WriterId> findSkewedMinWriters = findSkewedMinWriters(rebalanceContext, writerId2, indexedPriorityQueue2);
            if (findSkewedMinWriters.isEmpty()) {
                break;
            }
            Iterator<WriterId> it = findSkewedMinWriters.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                List<WriterId> rebalancePartition = rebalanceContext.rebalancePartition(writerId2, it.next());
                if (!rebalancePartition.isEmpty()) {
                    for (WriterId writerId3 : rebalancePartition) {
                        indexedPriorityQueue.addOrUpdate(writerId3, rebalanceContext.getWriterEstimatedWrittenBytes(writerId2));
                        indexedPriorityQueue2.addOrUpdate(writerId3, Long.MAX_VALUE - rebalanceContext.getWriterEstimatedWrittenBytes(writerId2));
                    }
                }
            }
            for (WriterId writerId4 : findSkewedMinWriters) {
                indexedPriorityQueue.addOrUpdate(writerId4, rebalanceContext.getWriterEstimatedWrittenBytes(writerId4));
                indexedPriorityQueue2.addOrUpdate(writerId4, Long.MAX_VALUE - rebalanceContext.getWriterEstimatedWrittenBytes(writerId4));
            }
        }
        resetStateForNextRebalance(rebalanceContext, list, long2LongMap);
    }

    private List<WriterId> findSkewedMinWriters(RebalanceContext rebalanceContext, WriterId writerId, IndexedPriorityQueue<WriterId> indexedPriorityQueue) {
        ImmutableList.Builder builder = ImmutableList.builder();
        long writerEstimatedWrittenBytes = rebalanceContext.getWriterEstimatedWrittenBytes(writerId);
        while (true) {
            WriterId poll = indexedPriorityQueue.poll();
            if (poll != null) {
                double writerEstimatedWrittenBytes2 = (writerEstimatedWrittenBytes - rebalanceContext.getWriterEstimatedWrittenBytes(poll)) / writerEstimatedWrittenBytes;
                if (writerEstimatedWrittenBytes2 <= SKEWNESS_THRESHOLD || Double.isNaN(writerEstimatedWrittenBytes2)) {
                    break;
                }
                builder.add(poll);
            } else {
                break;
            }
        }
        return builder.build();
    }

    private void resetStateForNextRebalance(RebalanceContext rebalanceContext, List<Long> list, Long2LongMap long2LongMap) {
        long2LongMap.forEach((l, l2) -> {
            WriterPartitionId deserialize = WriterPartitionId.deserialize(l.longValue());
            PartitionInfo partitionInfo = this.partitionInfos[deserialize.partitionId];
            if (rebalanceContext.isPartitionRebalanced(deserialize.partitionId)) {
                partitionInfo.resetPhysicalWrittenBytesAtLastRebalance();
            } else {
                partitionInfo.addToPhysicalWrittenBytesAtLastRebalance(rebalanceContext.estimatePartitionWrittenBytesSinceLastRebalance(new WriterId(deserialize.writerId), l2.longValue()));
            }
        });
        for (int i = 0; i < this.numberOfWriters; i++) {
            this.writerPhysicalWrittenBytesAtLastRebalance.set(i, list.get(i).longValue());
        }
    }
}
