package io.datakernel.remotefs;

import io.datakernel.async.function.AsyncSupplier;
import io.datakernel.async.function.AsyncSuppliers;
import io.datakernel.async.service.EventloopService;
import io.datakernel.async.util.LogUtils;
import io.datakernel.common.Initializable;
import io.datakernel.common.Preconditions;
import io.datakernel.common.collection.Try;
import io.datakernel.csp.ChannelConsumer;
import io.datakernel.csp.ChannelSupplier;
import io.datakernel.csp.process.ChannelSplitter;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.eventloop.jmx.EventloopJmxMBeanEx;
import io.datakernel.jmx.api.JmxAttribute;
import io.datakernel.jmx.api.JmxOperation;
import io.datakernel.promise.Promise;
import io.datakernel.promise.Promises;
import io.datakernel.promise.SettablePromise;
import io.datakernel.promise.jmx.PromiseStats;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/remotefs/RemoteFsRepartitionController.class */
public final class RemoteFsRepartitionController implements Initializable<RemoteFsRepartitionController>, EventloopJmxMBeanEx, EventloopService {
    private static final Logger logger = LoggerFactory.getLogger(RemoteFsRepartitionController.class);
    private final Eventloop eventloop;
    private final Object localPartitionId;
    private final RemoteFsClusterClient cluster;
    private final LocalFsClient localStorage;
    private final ServerSelector serverSelector;
    private final Map<Object, FsClient> clients;
    private final int replicationCount;

    @Nullable
    private SettablePromise<Void> closeCallback;
    private boolean isRepartitioning;
    private String glob = "**";
    private String negativeGlob = "";
    private int allFiles = 0;
    private int ensuredFiles = 0;
    private int failedFiles = 0;
    private final PromiseStats repartitionPromiseStats = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats singleFileRepartitionPromiseStats = PromiseStats.create(Duration.ofMinutes(5));
    private final AsyncSupplier<Void> repartition = AsyncSuppliers.reuse(this::doRepartition);

    private RemoteFsRepartitionController(Eventloop eventloop, Object obj, RemoteFsClusterClient remoteFsClusterClient, LocalFsClient localFsClient, ServerSelector serverSelector, Map<Object, FsClient> map, int i) {
        this.eventloop = eventloop;
        this.localPartitionId = obj;
        this.cluster = remoteFsClusterClient;
        this.localStorage = localFsClient;
        this.serverSelector = serverSelector;
        this.clients = map;
        this.replicationCount = i;
    }

    public static RemoteFsRepartitionController create(Object obj, RemoteFsClusterClient remoteFsClusterClient) {
        FsClient fsClient = remoteFsClusterClient.getClients().get(obj);
        Preconditions.checkState(fsClient instanceof LocalFsClient, "Local partition should be actually local and be an instance of LocalFsClient");
        return new RemoteFsRepartitionController(remoteFsClusterClient.getEventloop(), obj, remoteFsClusterClient, (LocalFsClient) fsClient, remoteFsClusterClient.getServerSelector(), remoteFsClusterClient.getAliveClients(), remoteFsClusterClient.getReplicationCount());
    }

    public RemoteFsRepartitionController withGlob(@NotNull String str) {
        this.glob = str;
        return this;
    }

    public RemoteFsRepartitionController withNegativeGlob(@NotNull String str) {
        this.negativeGlob = str;
        return this;
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    public Object getLocalPartitionId() {
        return this.localPartitionId;
    }

    public RemoteFsClusterClient getCluster() {
        return this.cluster;
    }

    public FsClient getLocalStorage() {
        return this.localStorage;
    }

    @NotNull
    public Promise<Void> repartition() {
        return this.repartition.get();
    }

    @NotNull
    private Promise<Void> doRepartition() {
        Preconditions.checkState(this.eventloop.inEventloopThread(), "Should be called from eventloop thread");
        this.isRepartitioning = true;
        return this.localStorage.list(this.glob).then(list -> {
            this.allFiles = list.size();
            return Promises.sequence(filterNot(list.stream(), this.negativeGlob).map(fileMetadata -> {
                return () -> {
                    return repartitionFile(fileMetadata).whenComplete(this.singleFileRepartitionPromiseStats.recordStats()).then(bool -> {
                        if (bool.booleanValue()) {
                            this.ensuredFiles++;
                        } else {
                            this.failedFiles++;
                        }
                        return Promise.complete();
                    });
                };
            }));
        }).whenComplete(() -> {
            this.isRepartitioning = false;
        }).whenComplete(this.repartitionPromiseStats.recordStats()).thenEx((r9, th) -> {
            if (th != null) {
                logger.warn("forced repartition finish, {} files ensured, {} errored, {} untouched", new Object[]{Integer.valueOf(this.ensuredFiles), Integer.valueOf(this.failedFiles), Integer.valueOf((this.allFiles - this.ensuredFiles) - this.failedFiles)});
            } else {
                logger.info("repartition finished, {} files ensured, {} errored", Integer.valueOf(this.ensuredFiles), Integer.valueOf(this.failedFiles));
            }
            if (this.closeCallback != null) {
                this.closeCallback.accept(r9, th);
            }
            return Promise.complete();
        });
    }

    private Stream<FileMetadata> filterNot(Stream<FileMetadata> stream, String str) {
        if (str.isEmpty()) {
            return stream;
        }
        if (!RemoteFsUtils.isWildcard(str)) {
            return stream.filter(fileMetadata -> {
                return !fileMetadata.getName().equals(this.negativeGlob);
            });
        }
        PathMatcher pathMatcher = FileSystems.getDefault().getPathMatcher("glob:" + this.negativeGlob);
        return stream.filter(fileMetadata2 -> {
            return !pathMatcher.matches(Paths.get(fileMetadata2.getName(), new String[0]));
        });
    }

    private Promise<Boolean> repartitionFile(FileMetadata fileMetadata) {
        HashSet hashSet = new HashSet(this.clients.keySet());
        hashSet.add(this.localPartitionId);
        List<Object> selectFrom = this.serverSelector.selectFrom(fileMetadata.getName(), hashSet, this.replicationCount);
        return getPartitionsThatNeedOurFile(fileMetadata, selectFrom).then(list -> {
            if (list == null) {
                return Promise.of(false);
            }
            String name = fileMetadata.getName();
            long revision = fileMetadata.getRevision();
            if (list.isEmpty()) {
                logger.trace("deleting file {} locally", fileMetadata);
                return this.localStorage.remove(name).map(r7 -> {
                    logger.info("handled file {} (ensured on {})", fileMetadata, selectFrom);
                    return true;
                });
            }
            if (list.size() == 1 && list.get(0) == this.localStorage) {
                logger.info("handled file {} (ensured on {})", fileMetadata, selectFrom);
                return Promise.of(true);
            }
            logger.trace("uploading file {} to partitions {}...", fileMetadata, list);
            ChannelSplitter channelSplitter = (ChannelSplitter) ChannelSplitter.create().withInput(ChannelSupplier.ofPromise(this.localStorage.download(name)));
            return Promises.toList(list.stream().map(obj -> {
                return obj == this.localPartitionId ? Promise.of(Try.of((Object) null)) : ChannelConsumer.getAcknowledgement(function -> {
                    channelSplitter.addOutput().set(ChannelConsumer.ofPromise(this.clients.get(obj).upload(name, 0L, revision)).withAcknowledgement(function));
                }).whenException(th -> {
                    logger.warn("failed uploading to partition " + obj + " (" + th + ')');
                    this.cluster.markDead(obj, th);
                }).whenResult(r72 -> {
                    logger.trace("file {} uploaded to '{}'", fileMetadata, obj);
                }).toTry();
            })).then(list -> {
                if (!list.stream().allMatch((v0) -> {
                    return v0.isSuccess();
                })) {
                    logger.warn("failed uploading file {}, skipping", fileMetadata);
                    return Promise.of(false);
                }
                if (list.contains(this.localPartitionId)) {
                    logger.info("handled file {} (ensured on {}, uploaded to {})", new Object[]{fileMetadata, selectFrom, list});
                    return Promise.of(true);
                }
                logger.trace("deleting file {} on {}", fileMetadata, this.localPartitionId);
                return this.localStorage.remove(name).map(r10 -> {
                    logger.info("handled file {} (ensured on {}, uploaded to {})", new Object[]{fileMetadata, selectFrom, list});
                    return true;
                });
            });
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.Level.TRACE, "repartitionFile", new Object[]{fileMetadata}));
    }

    private Promise<List<Object>> getPartitionsThatNeedOurFile(FileMetadata fileMetadata, List<Object> list) {
        ArrayList arrayList = new ArrayList();
        return Promises.toList(list.stream().map(obj -> {
            if (obj != this.localPartitionId) {
                return this.clients.get(obj).listEntities(fileMetadata.getName()).whenComplete((list2, th) -> {
                    if (th != null) {
                        logger.warn("failed connecting to partition " + obj + " (" + th + ')');
                        this.cluster.markDead(obj, th);
                    } else if (list2.isEmpty() || FileMetadata.COMPARATOR.compare((FileMetadata) list2.get(0), fileMetadata) < 0) {
                        arrayList.add(obj);
                    }
                }).toTry();
            }
            arrayList.add(obj);
            return Promise.of(Try.of((Object) null));
        })).then(list2 -> {
            if (list2.stream().allMatch((v0) -> {
                return v0.isSuccess();
            })) {
                return Promise.of(arrayList);
            }
            logger.warn("failed figuring out partitions for file " + fileMetadata + ", skipping");
            return Promise.of((Object) null);
        });
    }

    @NotNull
    public Promise<Void> start() {
        return Promise.complete();
    }

    @NotNull
    public Promise<Void> stop() {
        return isRepartitioning() ? Promise.ofCallback(settablePromise -> {
            this.closeCallback = settablePromise;
        }) : Promise.complete();
    }

    @JmxOperation(description = "start repartitioning")
    public void startRepartition() {
        repartition();
    }

    @JmxAttribute
    public boolean isRepartitioning() {
        return this.isRepartitioning;
    }

    @JmxAttribute
    public PromiseStats getRepartitionPromiseStats() {
        return this.repartitionPromiseStats;
    }

    @JmxAttribute
    public PromiseStats getSingleFileRepartitionPromiseStats() {
        return this.singleFileRepartitionPromiseStats;
    }

    @JmxAttribute
    public int getLastFilesToRepartition() {
        return this.allFiles;
    }

    @JmxAttribute
    public int getLastEnsuredFiles() {
        return this.ensuredFiles;
    }

    @JmxAttribute
    public int getLastFailedFiles() {
        return this.failedFiles;
    }
}
