package io.datakernel.remotefs;

import io.datakernel.async.Promise;
import io.datakernel.async.Promises;
import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.csp.ChannelConsumer;
import io.datakernel.csp.ChannelSupplier;
import io.datakernel.csp.process.ChannelSplitter;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.eventloop.EventloopService;
import io.datakernel.exception.StacklessException;
import io.datakernel.functional.Try;
import io.datakernel.jmx.EventloopJmxMBeanEx;
import io.datakernel.jmx.JmxAttribute;
import io.datakernel.jmx.PromiseStats;
import io.datakernel.util.Initializable;
import io.datakernel.util.LogUtils;
import io.datakernel.util.Preconditions;
import io.datakernel.util.Tuple2;
import java.time.Duration;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/remotefs/RemoteFsClusterClient.class */
public final class RemoteFsClusterClient implements FsClient, Initializable<RemoteFsClusterClient>, EventloopService, EventloopJmxMBeanEx {
    private static final Logger logger = LoggerFactory.getLogger(RemoteFsClusterClient.class);
    private final Eventloop eventloop;
    private final Map<Object, FsClient> clients;
    private final Map<Object, FsClient> aliveClients = new HashMap();
    private final Map<Object, FsClient> deadClients = new HashMap();
    private int replicationCount = 1;
    private ServerSelector serverSelector = ServerSelector.RENDEZVOUS_HASH_SHARDER;
    private final PromiseStats connectPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats uploadStartPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats uploadFinishPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats downloadStartPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats downloadFinishPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats movePromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats copyPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats listPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats deletePromise = PromiseStats.create(Duration.ofMinutes(5));

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.datakernel.remotefs.RemoteFsClusterClient$1ConsumerWithId, reason: invalid class name */
    /* loaded from: input_file:io/datakernel/remotefs/RemoteFsClusterClient$1ConsumerWithId.class */
    public class C1ConsumerWithId {
        final Object id;
        final ChannelConsumer<ByteBuf> consumer;

        C1ConsumerWithId(Object obj, ChannelConsumer<ByteBuf> channelConsumer) {
            this.id = obj;
            this.consumer = channelConsumer;
        }
    }

    private RemoteFsClusterClient(Eventloop eventloop, Map<Object, FsClient> map) {
        this.eventloop = eventloop;
        this.clients = map;
        this.aliveClients.putAll(map);
    }

    public static RemoteFsClusterClient create(Eventloop eventloop) {
        return new RemoteFsClusterClient(eventloop, new HashMap());
    }

    public static RemoteFsClusterClient create(Eventloop eventloop, Map<Object, FsClient> map) {
        return new RemoteFsClusterClient(eventloop, map);
    }

    public RemoteFsClusterClient withPartition(Object obj, FsClient fsClient) {
        this.clients.put(obj, fsClient);
        this.aliveClients.put(obj, fsClient);
        return this;
    }

    public RemoteFsClusterClient withReplicationCount(int i) {
        Preconditions.checkArgument(1 <= i && i <= this.clients.size(), "Replication count cannot be less than one or more than number of clients");
        this.replicationCount = i;
        return this;
    }

    public RemoteFsClusterClient withServerSelector(ServerSelector serverSelector) {
        Preconditions.checkNotNull(serverSelector, "serverSelector");
        this.serverSelector = serverSelector;
        return this;
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    public Map<Object, FsClient> getClients() {
        return Collections.unmodifiableMap(this.clients);
    }

    public Map<Object, FsClient> getAliveClients() {
        return Collections.unmodifiableMap(this.aliveClients);
    }

    public Map<Object, FsClient> getDeadClients() {
        return Collections.unmodifiableMap(this.deadClients);
    }

    public ServerSelector getServerSelector() {
        return this.serverSelector;
    }

    public Promise<Void> checkAllPartitions() {
        return Promises.all(this.clients.entrySet().stream().map(entry -> {
            Object key = entry.getKey();
            return ((FsClient) entry.getValue()).ping().mapEx((r6, th) -> {
                if (th == null) {
                    markAlive(key);
                    return null;
                }
                markDead(key, th);
                return null;
            });
        })).whenComplete(LogUtils.toLogger(logger, "checkAllPartitions", new Object[0]));
    }

    public Promise<Void> checkDeadPartitions() {
        return Promises.all(this.deadClients.entrySet().stream().map(entry -> {
            return ((FsClient) entry.getValue()).ping().mapEx((r5, th) -> {
                if (th != null) {
                    return null;
                }
                markAlive(entry.getKey());
                return null;
            });
        })).whenComplete(LogUtils.toLogger(logger, "checkDeadPartitions", new Object[0]));
    }

    private void markAlive(Object obj) {
        FsClient remove = this.deadClients.remove(obj);
        if (remove != null) {
            logger.info("Partition " + obj + " is alive again!");
            this.aliveClients.put(obj, remove);
        }
    }

    public boolean markDead(Object obj, @Nullable Throwable th) {
        FsClient remove = this.aliveClients.remove(obj);
        if (remove == null) {
            return false;
        }
        logger.warn("marking " + obj + " as dead (" + th + ')');
        this.deadClients.put(obj, remove);
        return true;
    }

    private void markIfDead(Object obj, Throwable th) {
        if (th.getClass() != StacklessException.class) {
            markDead(obj, th);
        }
    }

    private <T> BiFunction<T, Throwable, Promise<T>> wrapDeath(Object obj) {
        return (obj2, th) -> {
            if (th == null) {
                return Promise.of(obj2);
            }
            markIfDead(obj, th);
            return Promise.ofException(new StacklessException(RemoteFsClusterClient.class, "Node failed with exception", th));
        };
    }

    private static <T, U> Promise<T> ofFailure(String str, List<Try<U>> list) {
        StacklessException stacklessException = new StacklessException(RemoteFsClusterClient.class, str);
        Stream filter = list.stream().map((v0) -> {
            return v0.getExceptionOrNull();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        });
        stacklessException.getClass();
        filter.forEach(stacklessException::addSuppressed);
        return Promise.ofException(stacklessException);
    }

    private Promise<ChannelConsumer<ByteBuf>> upload(String str, long j, @Nullable Long l) {
        Preconditions.checkNotNull(str, "fileName");
        List<Object> selectFrom = this.serverSelector.selectFrom(str, this.aliveClients.keySet(), this.replicationCount);
        Preconditions.checkState(!selectFrom.isEmpty(), "Selected no servers to upload file " + str);
        Preconditions.checkState(this.aliveClients.keySet().containsAll(selectFrom), "Selected an id that is not one of client ids");
        return Promises.toList(selectFrom.stream().map(obj -> {
            FsClient fsClient = this.aliveClients.get(obj);
            return (l == null ? fsClient.upload(str, j) : fsClient.upload(str, j, l.longValue())).thenEx(wrapDeath(obj)).map(channelConsumer -> {
                return new C1ConsumerWithId(obj, channelConsumer.withAcknowledgement(promise -> {
                    return promise.whenException(th -> {
                        markIfDead(obj, th);
                    });
                }));
            }).toTry();
        })).then(list -> {
            List list = (List) list.stream().filter((v0) -> {
                return v0.isSuccess();
            }).map((v0) -> {
                return v0.get();
            }).collect(Collectors.toList());
            if (list.isEmpty()) {
                return ofFailure("Couldn't connect to any partition to upload file " + str, list);
            }
            ChannelSplitter lenient = ChannelSplitter.create().lenient();
            Promise list2 = Promises.toList(list.stream().map(c1ConsumerWithId -> {
                return ChannelConsumer.getAcknowledgement(function -> {
                    lenient.addOutput().set(c1ConsumerWithId.consumer.withAcknowledgement(function));
                }).toTry();
            }));
            if (logger.isTraceEnabled()) {
                logger.trace("uploading file {} to {}, {}", new Object[]{str, list.stream().map(c1ConsumerWithId2 -> {
                    return c1ConsumerWithId2.id.toString();
                }).collect(Collectors.joining(", ", "[", "]")), this});
            }
            return Promise.of(lenient.getInput().getConsumer().withAcknowledgement(promise -> {
                return promise.then(r3 -> {
                    return list2;
                }).then(list3 -> {
                    long count = list3.stream().filter((v0) -> {
                        return v0.isSuccess();
                    }).count();
                    return list3.size() < this.replicationCount ? ofFailure("Didn't connect to enough partitions uploading " + str + ", only " + count + " finished uploads", list3) : count < ((long) this.replicationCount) ? ofFailure("Couldn't finish uploadind file " + str + ", only " + count + " acknowlegdes received", list3) : Promise.complete();
                }).whenComplete(this.uploadFinishPromise.recordStats());
            }));
        }).whenComplete(this.uploadStartPromise.recordStats());
    }

    @Override // io.datakernel.remotefs.FsClient
    public Promise<ChannelConsumer<ByteBuf>> upload(String str, long j) {
        return upload(str, j, (Long) null);
    }

    @Override // io.datakernel.remotefs.FsClient
    public Promise<ChannelConsumer<ByteBuf>> upload(String str, long j, long j2) {
        return upload(str, j, Long.valueOf(j2));
    }

    @Override // io.datakernel.remotefs.FsClient
    public Promise<ChannelSupplier<ByteBuf>> download(String str, long j, long j2) {
        Preconditions.checkNotNull(str, "fileName");
        return this.deadClients.size() >= this.replicationCount ? ofFailure("There are more dead partitions than replication count(" + this.deadClients.size() + " dead, replication count is " + this.replicationCount + "), aborting", Collections.emptyList()) : Promises.toList(this.aliveClients.entrySet().stream().map(entry -> {
            Object key = entry.getKey();
            return ((FsClient) entry.getValue()).getMetadata(str).map(fileMetadata -> {
                if (fileMetadata != null) {
                    return new Tuple2(key, fileMetadata);
                }
                return null;
            }).thenEx(wrapDeath(key)).toTry();
        })).then(list -> {
            List list = (List) list.stream().filter((v0) -> {
                return v0.isSuccess();
            }).map((v0) -> {
                return v0.get();
            }).collect(Collectors.toList());
            if (this.deadClients.size() >= this.replicationCount) {
                return ofFailure("There are more dead partitions than replication count(" + this.deadClients.size() + " dead, replication count is " + this.replicationCount + "), aborting", list);
            }
            List list2 = (List) list.stream().filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList());
            Optional max = list2.stream().max(Comparator.comparing((v0) -> {
                return v0.getValue2();
            }, FileMetadata.COMPARATOR));
            if (!max.isPresent()) {
                return ofFailure("File not found: " + str, list);
            }
            Tuple2 tuple2 = (Tuple2) max.get();
            return Promises.any(list2.stream().filter(tuple22 -> {
                return ((FileMetadata) tuple22.getValue2()).getRevision() == ((FileMetadata) tuple2.getValue2()).getRevision();
            }).map(tuple23 -> {
                FsClient fsClient = this.aliveClients.get(tuple23.getValue1());
                if (fsClient == null) {
                    return Promise.ofException(new StacklessException(RemoteFsClusterClient.class, "Client " + tuple23.getValue1() + " is not alive"));
                }
                logger.trace("downloading file {} from {}", str, tuple23.getValue1());
                return fsClient.download(str, j, j2).whenException(th -> {
                    logger.warn("Failed to connect to server with key " + tuple23.getValue1() + " to download file " + str, th);
                }).thenEx(wrapDeath(tuple23.getValue1())).map(channelSupplier -> {
                    return channelSupplier.withEndOfStream(promise -> {
                        return promise.whenException(th2 -> {
                            markIfDead(tuple23.getValue1(), th2);
                        }).whenComplete(this.downloadFinishPromise.recordStats());
                    });
                });
            }), (v0) -> {
                v0.cancel();
            });
        }).whenComplete(this.downloadStartPromise.recordStats());
    }

    @Override // io.datakernel.remotefs.FsClient
    public Promise<Void> move(String str, String str2, long j, long j2) {
        Preconditions.checkNotNull(str, "name");
        Preconditions.checkNotNull(str2, "target");
        return this.deadClients.size() >= this.replicationCount ? ofFailure("There are more dead partitions than replication count(" + this.deadClients.size() + " dead, replication count is " + this.replicationCount + "), aborting", Collections.emptyList()) : Promises.all(this.aliveClients.entrySet().stream().map(entry -> {
            return ((FsClient) entry.getValue()).move(str, str2, j, j2).thenEx(wrapDeath(entry.getKey()));
        })).whenComplete(this.movePromise.recordStats());
    }

    @Override // io.datakernel.remotefs.FsClient
    public Promise<Void> copy(String str, String str2, long j) {
        Preconditions.checkNotNull(str, "name");
        Preconditions.checkNotNull(str2, "target");
        return this.deadClients.size() >= this.replicationCount ? ofFailure("There are more dead partitions than replication count(" + this.deadClients.size() + " dead, replication count is " + this.replicationCount + "), aborting", Collections.emptyList()) : Promises.all(this.aliveClients.entrySet().stream().map(entry -> {
            return ((FsClient) entry.getValue()).copy(str, str2, j).thenEx(wrapDeath(entry.getKey()));
        })).whenComplete(this.copyPromise.recordStats());
    }

    @Override // io.datakernel.remotefs.FsClient
    public Promise<Void> delete(String str, long j) {
        Preconditions.checkNotNull(str, "name");
        return Promises.toList(this.aliveClients.entrySet().stream().map(entry -> {
            return ((FsClient) entry.getValue()).delete(str).thenEx(wrapDeath(entry.getKey())).toTry();
        })).then(list -> {
            return list.stream().anyMatch((v0) -> {
                return v0.isSuccess();
            }) ? Promise.complete() : ofFailure("Couldn't delete on any partition", list);
        }).whenComplete(this.deletePromise.recordStats());
    }

    private Promise<List<FileMetadata>> doList(String str, BiFunction<FsClient, String, Promise<List<FileMetadata>>> biFunction) {
        Preconditions.checkNotNull(str, "glob");
        return this.deadClients.size() >= this.replicationCount ? ofFailure("There are more dead partitions than replication count(" + this.deadClients.size() + " dead, replication count is " + this.replicationCount + "), aborting", Collections.emptyList()) : Promises.toList(this.aliveClients.entrySet().stream().map(entry -> {
            return ((Promise) biFunction.apply(entry.getValue(), str)).thenEx(wrapDeath(entry.getKey())).toTry();
        })).then(list -> {
            return this.deadClients.size() >= this.replicationCount ? ofFailure("There are more dead partitions than replication count(" + this.deadClients.size() + " dead, replication count is " + this.replicationCount + "), aborting", list) : Promise.of(FileMetadata.flatten(list.stream().filter((v0) -> {
                return v0.isSuccess();
            }).map((v0) -> {
                return v0.get();
            })));
        }).whenComplete(this.listPromise.recordStats());
    }

    @Override // io.datakernel.remotefs.FsClient
    public Promise<List<FileMetadata>> listEntities(String str) {
        return doList(str, (v0, v1) -> {
            return v0.listEntities(v1);
        });
    }

    @Override // io.datakernel.remotefs.FsClient
    public Promise<List<FileMetadata>> list(String str) {
        return doList(str, (v0, v1) -> {
            return v0.list(v1);
        });
    }

    @Override // io.datakernel.remotefs.FsClient
    public Promise<Void> ping() {
        return checkAllPartitions();
    }

    @NotNull
    public Promise<Void> start() {
        return Promise.complete();
    }

    @NotNull
    public Promise<Void> stop() {
        return Promise.complete();
    }

    public String toString() {
        return "RemoteFsClusterClient{clients=" + this.clients + ", dead=" + this.deadClients.keySet() + '}';
    }

    @JmxAttribute
    public int getReplicationCount() {
        return this.replicationCount;
    }

    @JmxAttribute
    public void setReplicationCount(int i) {
        withReplicationCount(i);
    }

    @JmxAttribute
    public int getAlivePartitionCount() {
        return this.aliveClients.size();
    }

    @JmxAttribute
    public int getDeadPartitionCount() {
        return this.deadClients.size();
    }

    @JmxAttribute
    public String[] getAlivePartitions() {
        return (String[]) this.aliveClients.keySet().stream().map((v0) -> {
            return v0.toString();
        }).toArray(i -> {
            return new String[i];
        });
    }

    @JmxAttribute
    public String[] getDeadPartitions() {
        return (String[]) this.deadClients.keySet().stream().map((v0) -> {
            return v0.toString();
        }).toArray(i -> {
            return new String[i];
        });
    }

    @JmxAttribute
    public PromiseStats getConnectPromise() {
        return this.connectPromise;
    }

    @JmxAttribute
    public PromiseStats getUploadStartPromise() {
        return this.uploadStartPromise;
    }

    @JmxAttribute
    public PromiseStats getUploadFinishPromise() {
        return this.uploadFinishPromise;
    }

    @JmxAttribute
    public PromiseStats getDownloadStartPromise() {
        return this.downloadStartPromise;
    }

    @JmxAttribute
    public PromiseStats getDownloadFinishPromise() {
        return this.downloadFinishPromise;
    }

    @JmxAttribute
    public PromiseStats getMovePromise() {
        return this.movePromise;
    }

    @JmxAttribute
    public PromiseStats getCopyPromise() {
        return this.copyPromise;
    }

    @JmxAttribute
    public PromiseStats getListPromise() {
        return this.listPromise;
    }

    @JmxAttribute
    public PromiseStats getDeletePromise() {
        return this.deletePromise;
    }
}
