package io.datakernel.remotefs;

import io.datakernel.async.util.LogUtils;
import io.datakernel.common.exception.StacklessException;
import io.datakernel.csp.ChannelSupplier;
import io.datakernel.csp.RecyclingChannelConsumer;
import io.datakernel.csp.binary.ByteBufSerializer;
import io.datakernel.csp.net.Messaging;
import io.datakernel.csp.net.MessagingWithBinaryStreaming;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.jmx.api.JmxAttribute;
import io.datakernel.net.AbstractServer;
import io.datakernel.net.AsyncTcpSocket;
import io.datakernel.promise.Promise;
import io.datakernel.promise.jmx.PromiseStats;
import io.datakernel.remotefs.RemoteFsCommands;
import io.datakernel.remotefs.RemoteFsResponses;
import java.net.InetAddress;
import java.nio.file.Path;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Function;

/* loaded from: input_file:io/datakernel/remotefs/RemoteFsServer.class */
public final class RemoteFsServer extends AbstractServer<RemoteFsServer> {
    private static final ByteBufSerializer<RemoteFsCommands.FsCommand, RemoteFsResponses.FsResponse> SERIALIZER = ByteBufSerializer.ofJsonCodec(RemoteFsCommands.CODEC, RemoteFsResponses.CODEC);
    public static final StacklessException NO_HANDLER_FOR_MESSAGE = new StacklessException(RemoteFsServer.class, "No handler for received message type");
    private final Map<Class<?>, MessagingHandler<RemoteFsCommands.FsCommand>> handlers;
    private final FsClient client;
    private final PromiseStats handleRequestPromise;
    private final PromiseStats uploadPromise;
    private final PromiseStats downloadPromise;
    private final PromiseStats movePromise;
    private final PromiseStats copyPromise;
    private final PromiseStats listPromise;
    private final PromiseStats deletePromise;

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:io/datakernel/remotefs/RemoteFsServer$MessagingHandler.class */
    public interface MessagingHandler<T extends RemoteFsCommands.FsCommand> {
        Promise<Void> onMessage(Messaging<RemoteFsCommands.FsCommand, RemoteFsResponses.FsResponse> messaging, T t);
    }

    private RemoteFsServer(Eventloop eventloop, FsClient fsClient) {
        super(eventloop);
        this.handlers = new HashMap();
        this.handleRequestPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.uploadPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.downloadPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.movePromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.copyPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.listPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.deletePromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.client = fsClient;
        addHandlers();
    }

    public static RemoteFsServer create(Eventloop eventloop, Executor executor, Path path) {
        return new RemoteFsServer(eventloop, LocalFsClient.create(eventloop, executor, path));
    }

    public static RemoteFsServer create(Eventloop eventloop, FsClient fsClient) {
        return new RemoteFsServer(eventloop, fsClient);
    }

    public FsClient getClient() {
        return this.client;
    }

    protected void serve(AsyncTcpSocket asyncTcpSocket, InetAddress inetAddress) {
        MessagingWithBinaryStreaming create = MessagingWithBinaryStreaming.create(asyncTcpSocket, SERIALIZER);
        create.receive().then(fsCommand -> {
            if (fsCommand == null) {
                this.logger.warn("unexpected end of stream: {}", this);
                create.close();
                return Promise.complete();
            }
            MessagingHandler<RemoteFsCommands.FsCommand> messagingHandler = this.handlers.get(fsCommand.getClass());
            if (messagingHandler != null) {
                return messagingHandler.onMessage(create, fsCommand);
            }
            this.logger.warn("received a message with no associated handler, type: " + fsCommand.getClass());
            return Promise.ofException(NO_HANDLER_FOR_MESSAGE);
        }).whenComplete(this.handleRequestPromise.recordStats()).whenException(th -> {
            this.logger.warn("got an error while handling message (" + th + ") : " + this);
            create.send(new RemoteFsResponses.ServerError(RemoteFsUtils.getErrorCode(th))).then(r3 -> {
                return create.sendEndOfStream();
            }).whenResult(r32 -> {
                create.close();
            });
        });
    }

    private void addHandlers() {
        onMessage(RemoteFsCommands.Upload.class, (messaging, upload) -> {
            return this.client.upload(upload.getName(), upload.getOffset(), upload.getRevision()).then(channelConsumer -> {
                return channelConsumer instanceof RecyclingChannelConsumer ? messaging.send(new RemoteFsResponses.UploadAck(false)) : messaging.send(new RemoteFsResponses.UploadAck(true)).then(r5 -> {
                    return messaging.receiveBinaryStream().streamTo(channelConsumer);
                });
            }).then(r5 -> {
                return messaging.send(new RemoteFsResponses.UploadFinished());
            }).then(r3 -> {
                return messaging.sendEndOfStream();
            }).whenResult(r32 -> {
                messaging.close();
            }).whenComplete(this.uploadPromise.recordStats()).whenComplete(LogUtils.toLogger(this.logger, LogUtils.Level.TRACE, "receiving data", new Object[]{upload, this})).toVoid();
        });
        onMessage(RemoteFsCommands.Download.class, (messaging2, download) -> {
            String name = download.getName();
            return this.client.getMetadata(name).then(fileMetadata -> {
                if (fileMetadata == null) {
                    return Promise.ofException(FsClient.FILE_NOT_FOUND);
                }
                long size = fileMetadata.getSize();
                long offset = download.getOffset();
                long length = download.getLength();
                RemoteFsUtils.checkRange(size, offset, length);
                long j = length == -1 ? size - offset : length;
                return messaging2.send(new RemoteFsResponses.DownloadSize(j)).then(r14 -> {
                    return ChannelSupplier.ofPromise(this.client.download(name, offset, j)).streamTo(messaging2.sendBinaryStream());
                }).whenComplete(LogUtils.toLogger(this.logger, "sending data", new Object[]{fileMetadata, Long.valueOf(offset), Long.valueOf(j), this}));
            }).whenComplete(this.downloadPromise.recordStats());
        });
        onMessage(RemoteFsCommands.Move.class, simpleHandler(move -> {
            return this.client.move(move.getName(), move.getTarget(), move.getTargetRevision(), move.getRemoveRevision());
        }, r3 -> {
            return new RemoteFsResponses.MoveFinished();
        }, this.movePromise));
        onMessage(RemoteFsCommands.Copy.class, simpleHandler(copy -> {
            return this.client.copy(copy.getName(), copy.getTarget(), copy.getRevision());
        }, r32 -> {
            return new RemoteFsResponses.CopyFinished();
        }, this.copyPromise));
        onMessage(RemoteFsCommands.Delete.class, simpleHandler(delete -> {
            return this.client.delete(delete.getName(), delete.getRevision());
        }, r33 -> {
            return new RemoteFsResponses.DeleteFinished();
        }, this.deletePromise));
        onMessage(RemoteFsCommands.List.class, simpleHandler(list -> {
            return list.needTombstones() ? this.client.listEntities(list.getGlob()) : this.client.list(list.getGlob());
        }, RemoteFsResponses.ListFinished::new, this.listPromise));
    }

    private <T extends RemoteFsCommands.FsCommand, R> MessagingHandler<T> simpleHandler(Function<T, Promise<R>> function, Function<R, RemoteFsResponses.FsResponse> function2, PromiseStats promiseStats) {
        return (messaging, fsCommand) -> {
            return ((Promise) function.apply(fsCommand)).then(obj -> {
                return messaging.send((RemoteFsResponses.FsResponse) function2.apply(obj));
            }).then(r3 -> {
                return messaging.sendEndOfStream();
            }).whenComplete(promiseStats.recordStats());
        };
    }

    private <T extends RemoteFsCommands.FsCommand> void onMessage(Class<T> cls, MessagingHandler<T> messagingHandler) {
        this.handlers.put(cls, messagingHandler);
    }

    public String toString() {
        return "RemoteFsServer(" + this.client + ')';
    }

    @JmxAttribute
    public PromiseStats getUploadPromise() {
        return this.uploadPromise;
    }

    @JmxAttribute
    public PromiseStats getDownloadPromise() {
        return this.downloadPromise;
    }

    @JmxAttribute
    public PromiseStats getMovePromise() {
        return this.movePromise;
    }

    @JmxAttribute
    public PromiseStats getListPromise() {
        return this.listPromise;
    }

    @JmxAttribute
    public PromiseStats getDeletePromise() {
        return this.deletePromise;
    }

    @JmxAttribute
    public PromiseStats getHandleRequestPromise() {
        return this.handleRequestPromise;
    }
}
