package io.datakernel.crdt;

import io.datakernel.common.exception.StacklessException;
import io.datakernel.crdt.CrdtMessaging;
import io.datakernel.csp.ChannelSupplier;
import io.datakernel.csp.binary.ByteBufSerializer;
import io.datakernel.csp.net.MessagingWithBinaryStreaming;
import io.datakernel.datastream.StreamConsumer;
import io.datakernel.datastream.StreamSupplier;
import io.datakernel.datastream.csp.ChannelDeserializer;
import io.datakernel.datastream.csp.ChannelSerializer;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.net.AbstractServer;
import io.datakernel.net.AsyncTcpSocket;
import io.datakernel.promise.Promise;
import io.datakernel.serializer.BinarySerializer;
import java.lang.Comparable;
import java.net.InetAddress;

/* loaded from: input_file:io/datakernel/crdt/CrdtServer.class */
public final class CrdtServer<K extends Comparable<K>, S> extends AbstractServer<CrdtServer<K, S>> {
    private final CrdtStorage<K, S> client;
    private final CrdtDataSerializer<K, S> serializer;
    private final BinarySerializer<K> keySerializer;

    private CrdtServer(Eventloop eventloop, CrdtStorage<K, S> crdtStorage, CrdtDataSerializer<K, S> crdtDataSerializer) {
        super(eventloop);
        this.client = crdtStorage;
        this.serializer = crdtDataSerializer;
        this.keySerializer = crdtDataSerializer.getKeySerializer();
    }

    public static <K extends Comparable<K>, S> CrdtServer<K, S> create(Eventloop eventloop, CrdtStorage<K, S> crdtStorage, CrdtDataSerializer<K, S> crdtDataSerializer) {
        return new CrdtServer<>(eventloop, crdtStorage, crdtDataSerializer);
    }

    public static <K extends Comparable<K>, S> CrdtServer<K, S> create(Eventloop eventloop, CrdtStorage<K, S> crdtStorage, BinarySerializer<K> binarySerializer, BinarySerializer<S> binarySerializer2) {
        return new CrdtServer<>(eventloop, crdtStorage, new CrdtDataSerializer(binarySerializer, binarySerializer2));
    }

    protected void serve(AsyncTcpSocket asyncTcpSocket, InetAddress inetAddress) {
        MessagingWithBinaryStreaming create = MessagingWithBinaryStreaming.create(asyncTcpSocket, ByteBufSerializer.ofJsonCodec(CrdtMessaging.MESSAGE_CODEC, CrdtMessaging.RESPONSE_CODEC));
        create.receive().then(crdtMessage -> {
            return crdtMessage == null ? Promise.ofException(new StacklessException(CrdtServer.class, "Unexpected end of stream")) : crdtMessage == CrdtMessaging.CrdtMessages.UPLOAD ? ((StreamSupplier) create.receiveBinaryStream().transformWith(ChannelDeserializer.create(this.serializer))).streamTo(StreamConsumer.ofPromise(this.client.upload())).then(r4 -> {
                return create.send(CrdtMessaging.CrdtResponses.UPLOAD_FINISHED);
            }).then(r3 -> {
                return create.sendEndOfStream();
            }).whenResult(r32 -> {
                create.close();
            }) : crdtMessage == CrdtMessaging.CrdtMessages.REMOVE ? ((StreamSupplier) create.receiveBinaryStream().transformWith(ChannelDeserializer.create(this.keySerializer))).streamTo(StreamConsumer.ofPromise(this.client.remove())).then(r42 -> {
                return create.send(CrdtMessaging.CrdtResponses.REMOVE_FINISHED);
            }).then(r33 -> {
                return create.sendEndOfStream();
            }).whenResult(r34 -> {
                create.close();
            }) : crdtMessage instanceof CrdtMessaging.Download ? this.client.download(((CrdtMessaging.Download) crdtMessage).getToken()).whenResult(streamSupplier -> {
                create.send(new CrdtMessaging.DownloadStarted());
            }).then(streamSupplier2 -> {
                return ((ChannelSupplier) streamSupplier2.transformWith(ChannelSerializer.create(this.serializer))).streamTo(create.sendBinaryStream());
            }) : Promise.ofException(new StacklessException(CrdtServer.class, "Message type was added, but no handling code for it"));
        }).whenComplete((r8, th) -> {
            if (th == null) {
                return;
            }
            this.logger.warn("got an error while handling message (" + th + ") : " + this);
            create.send(new CrdtMessaging.ServerError((th.getClass() != StacklessException.class ? th.getClass().getSimpleName() + ": " : "") + th.getMessage())).then(r3 -> {
                return create.sendEndOfStream();
            }).whenResult(r32 -> {
                create.close();
            });
        });
    }
}
