package io.trino.plugin.exchange.filesystem;

import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.concurrent.MoreFutures;
import io.airlift.slice.Slice;
import io.trino.plugin.exchange.filesystem.MetricsBuilder;
import io.trino.spi.exchange.ExchangeSource;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceOutputSelector;
import io.trino.spi.metrics.Metrics;
import jakarta.annotation.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

/* loaded from: input_file:io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.class */
public class FileSystemExchangeSource implements ExchangeSource {
    private final FileSystemExchangeStorage exchangeStorage;
    private final FileSystemExchangeStats stats;
    private final int maxPageStorageSize;
    private final int exchangeSourceConcurrentReaders;
    private final int maxFilesPerReader;

    @GuardedBy("this")
    private boolean noMoreFiles;

    @GuardedBy("this")
    private ExchangeSourceOutputSelector currentSelector;
    private final Queue<ExchangeSourceFile> files = new ConcurrentLinkedQueue();

    @GuardedBy("this")
    private SettableFuture<Void> blockedOnFiles = SettableFuture.create();
    private final AtomicReference<List<ExchangeStorageReader>> readers = new AtomicReference<>(ImmutableList.of());
    private final AtomicReference<ListenableFuture<Void>> blocked = new AtomicReference<>();
    private final AtomicBoolean closed = new AtomicBoolean();
    private final MetricsBuilder metricsBuilder = new MetricsBuilder();
    private final MetricsBuilder.CounterMetricBuilder totalFilesMetric = this.metricsBuilder.getCounterMetric(MetricsBuilder.SOURCE_FILES_TOTAL);

    public FileSystemExchangeSource(FileSystemExchangeStorage fileSystemExchangeStorage, FileSystemExchangeStats fileSystemExchangeStats, int i, int i2, int i3) {
        this.exchangeStorage = (FileSystemExchangeStorage) Objects.requireNonNull(fileSystemExchangeStorage, "exchangeStorage is null");
        this.stats = (FileSystemExchangeStats) Objects.requireNonNull(fileSystemExchangeStats, "stats is null");
        this.maxPageStorageSize = i;
        this.exchangeSourceConcurrentReaders = i2;
        this.maxFilesPerReader = i3;
    }

    public synchronized void addSourceHandles(List<ExchangeSourceHandle> list) {
        if (this.closed.get()) {
            return;
        }
        this.files.addAll(getFiles(list));
        this.totalFilesMetric.add(r0.size());
        closeAndCreateReadersIfNecessary();
    }

    public synchronized void noMoreSourceHandles() {
        this.noMoreFiles = true;
        closeAndCreateReadersIfNecessary();
    }

    public synchronized void setOutputSelector(ExchangeSourceOutputSelector exchangeSourceOutputSelector) {
        if (this.currentSelector != null) {
            if (this.currentSelector.getVersion() >= exchangeSourceOutputSelector.getVersion()) {
                return;
            } else {
                this.currentSelector.checkValidTransition(exchangeSourceOutputSelector);
            }
        }
        this.currentSelector = exchangeSourceOutputSelector;
        closeAndCreateReadersIfNecessary();
    }

    public CompletableFuture<Void> isBlocked() {
        ListenableFuture<Void> record;
        if (this.closed.get()) {
            return NOT_BLOCKED;
        }
        ListenableFuture<Void> listenableFuture = this.blocked.get();
        if (listenableFuture != null && !listenableFuture.isDone()) {
            return nonCancellationPropagatingCompletableFuture(listenableFuture);
        }
        List<ExchangeStorageReader> list = this.readers.get();
        for (int i = 0; i < list.size(); i++) {
            if (list.get(i).isBlocked().isDone()) {
                return NOT_BLOCKED;
            }
        }
        synchronized (this) {
            record = this.stats.getExchangeSourceBlocked().record((ListenableFuture) (!this.blockedOnFiles.isDone() ? this.blockedOnFiles : list.isEmpty() ? Futures.immediateVoidFuture() : MoreFutures.whenAnyComplete((Iterable) list.stream().map((v0) -> {
                return v0.isBlocked();
            }).collect(ImmutableList.toImmutableList()))));
            this.blocked.set(record);
        }
        return nonCancellationPropagatingCompletableFuture(record);
    }

    private static CompletableFuture<Void> nonCancellationPropagatingCompletableFuture(ListenableFuture<Void> listenableFuture) {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        Futures.addCallback(listenableFuture, new FutureCallback<Void>() { // from class: io.trino.plugin.exchange.filesystem.FileSystemExchangeSource.1
            public void onSuccess(Void r4) {
                completableFuture.complete(r4);
            }

            public void onFailure(Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        }, MoreExecutors.directExecutor());
        return completableFuture;
    }

    public boolean isFinished() {
        return this.closed.get();
    }

    @Nullable
    public Slice read() {
        if (this.closed.get()) {
            return null;
        }
        Slice slice = null;
        List<ExchangeStorageReader> list = this.readers.get();
        int i = 0;
        while (true) {
            if (i >= list.size()) {
                break;
            }
            ExchangeStorageReader exchangeStorageReader = list.get(i);
            if (exchangeStorageReader.isBlocked().isDone() && !exchangeStorageReader.isFinished()) {
                try {
                    slice = exchangeStorageReader.read();
                    break;
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
            i++;
        }
        closeAndCreateReadersIfNecessary();
        return slice;
    }

    public long getMemoryUsage() {
        long j = 0;
        List<ExchangeStorageReader> list = this.readers.get();
        for (int i = 0; i < list.size(); i++) {
            j += list.get(i).getRetainedSize();
        }
        return j;
    }

    public synchronized void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.files.clear();
            Closer create = Closer.create();
            Iterator<ExchangeStorageReader> it = this.readers.getAndSet(ImmutableList.of()).iterator();
            while (it.hasNext()) {
                create.register(it.next());
            }
            try {
                create.close();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
    }

    private void closeAndCreateReadersIfNecessary() {
        int numberOfActiveReaders = getNumberOfActiveReaders();
        if (numberOfActiveReaders == this.exchangeSourceConcurrentReaders) {
            return;
        }
        if (numberOfActiveReaders <= 0 || !this.files.isEmpty()) {
            SettableFuture<Void> settableFuture = null;
            synchronized (this) {
                if (this.closed.get()) {
                    return;
                }
                if (this.currentSelector == null || !this.currentSelector.isFinal()) {
                    return;
                }
                ArrayList arrayList = new ArrayList();
                for (ExchangeStorageReader exchangeStorageReader : this.readers.get()) {
                    if (exchangeStorageReader.isFinished()) {
                        exchangeStorageReader.close();
                    } else {
                        arrayList.add(exchangeStorageReader);
                    }
                }
                while (arrayList.size() < this.exchangeSourceConcurrentReaders && !this.files.isEmpty()) {
                    try {
                        ImmutableList.Builder builder = ImmutableList.builder();
                        int i = 0;
                        long j = 0;
                        while (!this.files.isEmpty()) {
                            ExchangeSourceFile peek = this.files.peek();
                            Verify.verify(this.currentSelector.getSelection(peek.getExchangeId(), peek.getSourceTaskPartitionId(), peek.getSourceTaskAttemptId()) == ExchangeSourceOutputSelector.Selection.INCLUDED, "%s.%s.%s is not marked as included by the engine", peek.getExchangeId(), Integer.valueOf(peek.getSourceTaskPartitionId()), Integer.valueOf(peek.getSourceTaskAttemptId()));
                            if (i == 0 || (j + peek.getFileSize() <= this.maxPageStorageSize + this.exchangeStorage.getWriteBufferSize() && i < this.maxFilesPerReader)) {
                                builder.add(peek);
                                j += peek.getFileSize();
                                i++;
                                this.files.poll();
                            }
                            arrayList.add(this.exchangeStorage.createExchangeStorageReader(builder.build(), this.maxPageStorageSize, this.metricsBuilder));
                        }
                        arrayList.add(this.exchangeStorage.createExchangeStorageReader(builder.build(), this.maxPageStorageSize, this.metricsBuilder));
                    } finally {
                    }
                }
                if (arrayList.isEmpty()) {
                    if (this.noMoreFiles) {
                        settableFuture = this.blockedOnFiles;
                        close();
                    } else if (this.blockedOnFiles.isDone()) {
                        this.blockedOnFiles = SettableFuture.create();
                    }
                } else if (!this.blockedOnFiles.isDone()) {
                    settableFuture = this.blockedOnFiles;
                }
                this.readers.set(ImmutableList.copyOf(arrayList));
                if (settableFuture != null) {
                    settableFuture.set((Object) null);
                }
            }
        }
    }

    private int getNumberOfActiveReaders() {
        List<ExchangeStorageReader> list = this.readers.get();
        int i = 0;
        for (int i2 = 0; i2 < list.size(); i2++) {
            if (!list.get(i2).isFinished()) {
                i++;
            }
        }
        return i;
    }

    public Optional<Metrics> getMetrics() {
        return Optional.of(this.metricsBuilder.buildMetrics());
    }

    private static List<ExchangeSourceFile> getFiles(List<ExchangeSourceHandle> list) {
        Stream<ExchangeSourceHandle> stream = list.stream();
        Class<FileSystemExchangeSourceHandle> cls = FileSystemExchangeSourceHandle.class;
        Objects.requireNonNull(FileSystemExchangeSourceHandle.class);
        return (List) stream.map((v1) -> {
            return r1.cast(v1);
        }).flatMap(fileSystemExchangeSourceHandle -> {
            return fileSystemExchangeSourceHandle.getFiles().stream().map(sourceFile -> {
                return new ExchangeSourceFile(URI.create(sourceFile.getFilePath()), sourceFile.getFileSize(), fileSystemExchangeSourceHandle.getExchangeId(), sourceFile.getSourceTaskPartitionId(), sourceFile.getSourceTaskAttemptId());
            });
        }).collect(ImmutableList.toImmutableList());
    }
}
