package io.datakernel.remotefs;

import io.datakernel.async.Promise;
import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.csp.ChannelConsumer;
import io.datakernel.csp.ChannelConsumers;
import io.datakernel.csp.ChannelSupplier;
import io.datakernel.csp.binary.ByteBufSerializer;
import io.datakernel.csp.net.MessagingWithBinaryStreaming;
import io.datakernel.eventloop.AsyncTcpSocketImpl;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.eventloop.EventloopService;
import io.datakernel.exception.StacklessException;
import io.datakernel.jmx.JmxAttribute;
import io.datakernel.jmx.PromiseStats;
import io.datakernel.net.SocketSettings;
import io.datakernel.remotefs.RemoteFsCommands;
import io.datakernel.remotefs.RemoteFsResponses;
import io.datakernel.util.LogUtils;
import io.datakernel.util.Preconditions;
import io.datakernel.util.ref.RefLong;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/remotefs/RemoteFsClient.class */
public final class RemoteFsClient implements FsClient, EventloopService {
    private static final Logger logger = LoggerFactory.getLogger(RemoteFsClient.class);
    public static final StacklessException INVALID_MESSAGE = new StacklessException(RemoteFsClient.class, "Invalid or unexpected message received");
    public static final StacklessException TOO_MUCH_DATA = new StacklessException(RemoteFsClient.class, "Received more bytes than expected");
    public static final StacklessException UNEXPECTED_END_OF_STREAM = new StacklessException(RemoteFsClient.class, "Unexpected end of stream");
    public static final StacklessException UNKNOWN_SERVER_ERROR = new StacklessException(RemoteFsClient.class, "Unknown server error occured");
    private static final ByteBufSerializer<RemoteFsResponses.FsResponse, RemoteFsCommands.FsCommand> SERIALIZER = ByteBufSerializer.ofJsonCodec(RemoteFsResponses.CODEC, RemoteFsCommands.CODEC);
    private final Eventloop eventloop;
    private final InetSocketAddress address;
    private SocketSettings socketSettings = SocketSettings.create();
    private final PromiseStats connectPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats uploadStartPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats uploadFinishPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats downloadStartPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats downloadFinishPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats movePromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats copyPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats listPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats deletePromise = PromiseStats.create(Duration.ofMinutes(5));

    private RemoteFsClient(Eventloop eventloop, InetSocketAddress inetSocketAddress) {
        this.eventloop = eventloop;
        this.address = inetSocketAddress;
    }

    public static RemoteFsClient create(Eventloop eventloop, InetSocketAddress inetSocketAddress) {
        return new RemoteFsClient(eventloop, inetSocketAddress);
    }

    public RemoteFsClient withSocketSettings(SocketSettings socketSettings) {
        this.socketSettings = socketSettings;
        return this;
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @Override // io.datakernel.remotefs.FsClient
    public Promise<ChannelConsumer<ByteBuf>> upload(String str, long j, long j2) {
        Preconditions.checkNotNull(str, "fileName");
        return connect(this.address).then(messagingWithBinaryStreaming -> {
            return messagingWithBinaryStreaming.send(new RemoteFsCommands.Upload(str, j, j2)).then(r3 -> {
                return messagingWithBinaryStreaming.receive();
            }).then(fsResponse -> {
                return !(fsResponse instanceof RemoteFsResponses.UploadAck) ? handleInvalidResponse(fsResponse) : !((RemoteFsResponses.UploadAck) fsResponse).isOk() ? Promise.of(ChannelConsumers.recycling()) : Promise.of(messagingWithBinaryStreaming.sendBinaryStream().withAcknowledgement(promise -> {
                    return promise.then(r32 -> {
                        return messagingWithBinaryStreaming.receive();
                    }).then(fsResponse -> {
                        messagingWithBinaryStreaming.close();
                        return fsResponse instanceof RemoteFsResponses.UploadFinished ? Promise.complete() : handleInvalidResponse(fsResponse);
                    }).whenException(th -> {
                        messagingWithBinaryStreaming.close(th);
                        logger.warn("Cancelled while trying to upload file " + str + " (" + th + "): " + this);
                    }).whenComplete(this.uploadFinishPromise.recordStats());
                }));
            }).whenException(th -> {
                messagingWithBinaryStreaming.close(th);
                logger.warn("Error while trying to upload file " + str + " (" + th + "): " + this);
            });
        }).whenComplete(LogUtils.toLogger(logger, "upload", new Object[]{str, this})).whenComplete(this.uploadStartPromise.recordStats());
    }

    @Override // io.datakernel.remotefs.FsClient
    public Promise<ChannelSupplier<ByteBuf>> download(String str, long j, long j2) {
        Preconditions.checkNotNull(str, "fileName");
        return connect(this.address).then(messagingWithBinaryStreaming -> {
            return messagingWithBinaryStreaming.send(new RemoteFsCommands.Download(str, j, j2)).then(r3 -> {
                return messagingWithBinaryStreaming.receive();
            }).then(fsResponse -> {
                if (!(fsResponse instanceof RemoteFsResponses.DownloadSize)) {
                    return handleInvalidResponse(fsResponse);
                }
                long size = ((RemoteFsResponses.DownloadSize) fsResponse).getSize();
                logger.trace("download size for file {} is {}: {}", new Object[]{str, Long.valueOf(size), this});
                RefLong refLong = new RefLong(0L);
                return Promise.of(messagingWithBinaryStreaming.receiveBinaryStream().peek(byteBuf -> {
                    refLong.inc(byteBuf.readRemaining());
                }).withEndOfStream(promise -> {
                    return promise.then(r32 -> {
                        return messagingWithBinaryStreaming.sendEndOfStream();
                    }).then(r13 -> {
                        if (refLong.get() == size) {
                            return Promise.of(r13);
                        }
                        logger.error("invalid stream size for file " + str + " (offset " + j + ", length " + j2 + "), expected: " + size + " actual: " + refLong.get());
                        return Promise.ofException(refLong.get() < size ? UNEXPECTED_END_OF_STREAM : TOO_MUCH_DATA);
                    }).whenComplete(this.downloadFinishPromise.recordStats()).whenResult(r33 -> {
                        messagingWithBinaryStreaming.close();
                    });
                }));
            }).whenException(th -> {
                messagingWithBinaryStreaming.close(th);
                logger.warn("error trying to download file " + str + " (offset=" + j + ", length=" + j2 + ") (" + th + "): " + this);
            });
        }).whenComplete(LogUtils.toLogger(logger, "download", new Object[]{str, Long.valueOf(j), Long.valueOf(j2), this})).whenComplete(this.downloadStartPromise.recordStats());
    }

    @Override // io.datakernel.remotefs.FsClient
    public Promise<Void> move(String str, String str2, long j, long j2) {
        return simpleCommand(new RemoteFsCommands.Move(str, str2, j, j2), RemoteFsResponses.MoveFinished.class, moveFinished -> {
            return (Void) null;
        }).whenComplete(LogUtils.toLogger(logger, "move", new Object[]{str, str2, Long.valueOf(j), Long.valueOf(j2), this})).whenComplete(this.movePromise.recordStats());
    }

    @Override // io.datakernel.remotefs.FsClient
    public Promise<Void> copy(String str, String str2, long j) {
        return simpleCommand(new RemoteFsCommands.Copy(str, str2, j), RemoteFsResponses.CopyFinished.class, copyFinished -> {
            return (Void) null;
        }).whenComplete(LogUtils.toLogger(logger, "copy", new Object[]{str, str2, Long.valueOf(j), this})).whenComplete(this.copyPromise.recordStats());
    }

    @Override // io.datakernel.remotefs.FsClient
    public Promise<Void> delete(String str, long j) {
        return simpleCommand(new RemoteFsCommands.Delete(str, j), RemoteFsResponses.DeleteFinished.class, deleteFinished -> {
            return (Void) null;
        }).whenComplete(LogUtils.toLogger(logger, "delete", new Object[]{str, Long.valueOf(j), this})).whenComplete(this.deletePromise.recordStats());
    }

    @Override // io.datakernel.remotefs.FsClient
    public Promise<List<FileMetadata>> listEntities(String str) {
        Preconditions.checkNotNull(str, "glob");
        return simpleCommand(new RemoteFsCommands.List(str, true), RemoteFsResponses.ListFinished.class, (v0) -> {
            return v0.getFiles();
        }).whenComplete(LogUtils.toLogger(logger, "listEntities", new Object[]{str, this})).whenComplete(this.listPromise.recordStats());
    }

    @Override // io.datakernel.remotefs.FsClient
    public Promise<List<FileMetadata>> list(String str) {
        Preconditions.checkNotNull(str, "glob");
        return simpleCommand(new RemoteFsCommands.List(str, false), RemoteFsResponses.ListFinished.class, (v0) -> {
            return v0.getFiles();
        }).whenComplete(LogUtils.toLogger(logger, "list", new Object[]{str, this})).whenComplete(this.listPromise.recordStats());
    }

    private Promise<MessagingWithBinaryStreaming<RemoteFsResponses.FsResponse, RemoteFsCommands.FsCommand>> connect(InetSocketAddress inetSocketAddress) {
        return AsyncTcpSocketImpl.connect(inetSocketAddress, 0L, this.socketSettings).map(asyncTcpSocketImpl -> {
            return MessagingWithBinaryStreaming.create(asyncTcpSocketImpl, SERIALIZER);
        }).whenResult(messagingWithBinaryStreaming -> {
            logger.trace("connected to [{}]: {}", inetSocketAddress, this);
        }).whenException(th -> {
            logger.warn("failed connecting to [" + inetSocketAddress + "] (" + th + "): " + this);
        }).whenComplete(this.connectPromise.recordStats());
    }

    private <T> Promise<T> handleInvalidResponse(@Nullable RemoteFsResponses.FsResponse fsResponse) {
        if (fsResponse == null) {
            logger.warn(this + ": Received unexpected end of stream");
            return Promise.ofException(UNEXPECTED_END_OF_STREAM);
        }
        if (!(fsResponse instanceof RemoteFsResponses.ServerError)) {
            return Promise.ofException(INVALID_MESSAGE);
        }
        int code = ((RemoteFsResponses.ServerError) fsResponse).getCode();
        return Promise.ofException((code < 1 || code > RemoteFsUtils.KNOWN_ERRORS.length) ? UNKNOWN_SERVER_ERROR : RemoteFsUtils.KNOWN_ERRORS[code - 1]);
    }

    private <T, R extends RemoteFsResponses.FsResponse> Promise<T> simpleCommand(RemoteFsCommands.FsCommand fsCommand, Class<R> cls, Function<R, T> function) {
        return connect(this.address).then(messagingWithBinaryStreaming -> {
            return messagingWithBinaryStreaming.send(fsCommand).then(r3 -> {
                return messagingWithBinaryStreaming.receive();
            }).then(fsResponse -> {
                messagingWithBinaryStreaming.close();
                return (fsResponse == null || fsResponse.getClass() != cls) ? handleInvalidResponse(fsResponse) : Promise.of(function.apply(cls.cast(fsResponse)));
            }).whenException(th -> {
                messagingWithBinaryStreaming.close(th);
                logger.warn("Error while processing command " + fsCommand + " (" + th + ") : " + this);
            });
        });
    }

    @NotNull
    public Promise<Void> start() {
        return Promise.complete();
    }

    @NotNull
    public Promise<Void> stop() {
        return Promise.complete();
    }

    public String toString() {
        return "RemoteFsClient{address=" + this.address + '}';
    }

    @JmxAttribute
    public PromiseStats getConnectPromise() {
        return this.connectPromise;
    }

    @JmxAttribute
    public PromiseStats getUploadStartPromise() {
        return this.uploadStartPromise;
    }

    @JmxAttribute
    public PromiseStats getUploadFinishPromise() {
        return this.uploadFinishPromise;
    }

    @JmxAttribute
    public PromiseStats getDownloadStartPromise() {
        return this.downloadStartPromise;
    }

    @JmxAttribute
    public PromiseStats getDownloadFinishPromise() {
        return this.downloadFinishPromise;
    }

    @JmxAttribute
    public PromiseStats getMovePromise() {
        return this.movePromise;
    }

    @JmxAttribute
    public PromiseStats getCopyPromise() {
        return this.copyPromise;
    }

    @JmxAttribute
    public PromiseStats getListPromise() {
        return this.listPromise;
    }

    @JmxAttribute
    public PromiseStats getDeletePromise() {
        return this.deletePromise;
    }
}
