package io.activej.crdt;

import io.activej.async.util.LogUtils;
import io.activej.crdt.messaging.CrdtRequest;
import io.activej.crdt.messaging.CrdtResponse;
import io.activej.crdt.messaging.Version;
import io.activej.crdt.storage.ICrdtStorage;
import io.activej.crdt.util.CrdtDataBinarySerializer;
import io.activej.crdt.util.Utils;
import io.activej.csp.binary.codec.ByteBufsCodec;
import io.activej.csp.binary.codec.ByteBufsCodecs;
import io.activej.csp.net.IMessaging;
import io.activej.csp.net.Messaging;
import io.activej.csp.supplier.ChannelSupplier;
import io.activej.datastream.consumer.StreamConsumer;
import io.activej.datastream.consumer.StreamConsumers;
import io.activej.datastream.stats.BasicStreamStats;
import io.activej.datastream.stats.DetailedStreamStats;
import io.activej.datastream.stats.StreamStats;
import io.activej.datastream.supplier.StreamSupplier;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.net.AbstractReactiveServer;
import io.activej.net.socket.tcp.ITcpSocket;
import io.activej.promise.Promise;
import io.activej.promise.jmx.PromiseStats;
import io.activej.reactor.nio.NioReactor;
import io.activej.serializer.BinarySerializer;
import java.lang.Comparable;
import java.net.InetAddress;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Function;

/* loaded from: input_file:io/activej/crdt/CrdtServer.class */
public final class CrdtServer<K extends Comparable<K>, S> extends AbstractReactiveServer {
    public static final Version VERSION = new Version(1, 0);
    private static final ByteBufsCodec<CrdtRequest, CrdtResponse> SERIALIZER = ByteBufsCodecs.ofStreamCodecs(Utils.CRDT_REQUEST_CODEC, Utils.CRDT_RESPONSE_CODEC);
    private Function<CrdtRequest.Handshake, CrdtResponse.Handshake> handshakeHandler;
    private final ICrdtStorage<K, S> storage;
    private final CrdtDataBinarySerializer<K, S> serializer;
    private final BinarySerializer<CrdtTombstone<K>> tombstoneSerializer;
    private boolean detailedStats;
    private final BasicStreamStats<CrdtData<K, S>> uploadStats;
    private final DetailedStreamStats<CrdtData<K, S>> uploadStatsDetailed;
    private final BasicStreamStats<CrdtData<K, S>> downloadStats;
    private final DetailedStreamStats<CrdtData<K, S>> downloadStatsDetailed;
    private final BasicStreamStats<CrdtData<K, S>> takeStats;
    private final DetailedStreamStats<CrdtData<K, S>> takeStatsDetailed;
    private final BasicStreamStats<CrdtTombstone<K>> removeStats;
    private final DetailedStreamStats<CrdtTombstone<K>> removeStatsDetailed;
    private final PromiseStats handshakePromise;
    private final PromiseStats downloadBeginPromise;
    private final PromiseStats downloadFinishedPromise;
    private final PromiseStats uploadBeginPromise;
    private final PromiseStats uploadFinishedPromise;
    private final PromiseStats removeBeginPromise;
    private final PromiseStats removeFinishedPromise;
    private final PromiseStats takeBeginPromise;
    private final PromiseStats takeFinishedPromise;
    private final PromiseStats pingPromise;

    /* loaded from: input_file:io/activej/crdt/CrdtServer$Builder.class */
    public final class Builder extends AbstractReactiveServer.Builder<CrdtServer<K, S>.Builder, CrdtServer<K, S>> {
        private Builder() {
            super(CrdtServer.this);
        }

        public CrdtServer<K, S>.Builder withHandshakeHandler(Function<CrdtRequest.Handshake, CrdtResponse.Handshake> function) {
            checkNotBuilt(this);
            CrdtServer.this.handshakeHandler = function;
            return this;
        }
    }

    private CrdtServer(NioReactor nioReactor, ICrdtStorage<K, S> iCrdtStorage, CrdtDataBinarySerializer<K, S> crdtDataBinarySerializer) {
        super(nioReactor);
        this.handshakeHandler = handshake -> {
            return new CrdtResponse.Handshake(null);
        };
        this.uploadStats = StreamStats.basic();
        this.uploadStatsDetailed = StreamStats.detailed();
        this.downloadStats = StreamStats.basic();
        this.downloadStatsDetailed = StreamStats.detailed();
        this.takeStats = StreamStats.basic();
        this.takeStatsDetailed = StreamStats.detailed();
        this.removeStats = StreamStats.basic();
        this.removeStatsDetailed = StreamStats.detailed();
        this.handshakePromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.downloadBeginPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.downloadFinishedPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.uploadBeginPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.uploadFinishedPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.removeBeginPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.removeFinishedPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.takeBeginPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.takeFinishedPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.pingPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.storage = iCrdtStorage;
        this.serializer = crdtDataBinarySerializer;
        this.tombstoneSerializer = crdtDataBinarySerializer.getTombstoneSerializer();
    }

    public static <K extends Comparable<K>, S> CrdtServer<K, S>.Builder builder(NioReactor nioReactor, ICrdtStorage<K, S> iCrdtStorage, CrdtDataBinarySerializer<K, S> crdtDataBinarySerializer) {
        return new Builder();
    }

    public static <K extends Comparable<K>, S> CrdtServer<K, S>.Builder builder(NioReactor nioReactor, ICrdtStorage<K, S> iCrdtStorage, BinarySerializer<K> binarySerializer, BinarySerializer<S> binarySerializer2) {
        return new Builder();
    }

    protected void serve(ITcpSocket iTcpSocket, InetAddress inetAddress) {
        Messaging create = Messaging.create(iTcpSocket, SERIALIZER);
        Promise then = create.receive().then(crdtRequest -> {
            return crdtRequest instanceof CrdtRequest.Handshake ? handleHandshake(create, (CrdtRequest.Handshake) crdtRequest) : Promise.ofException(new CrdtException("Handshake expected"));
        });
        Objects.requireNonNull(create);
        then.then(create::receive).then(crdtRequest2 -> {
            return dispatch(create, crdtRequest2);
        }).whenException(exc -> {
            this.logger.warn("got an error while handling message {}", this, exc);
            Promise send = create.send(new CrdtResponse.ServerError(exc.getClass().getSimpleName() + ": " + exc.getMessage()));
            Objects.requireNonNull(create);
            Promise then2 = send.then(create::sendEndOfStream);
            Objects.requireNonNull(create);
            then2.whenResult(create::close);
        });
    }

    private Promise<Void> dispatch(Messaging<CrdtRequest, CrdtResponse> messaging, CrdtRequest crdtRequest) {
        if (crdtRequest instanceof CrdtRequest.Download) {
            return handleDownload(messaging, (CrdtRequest.Download) crdtRequest);
        }
        if (crdtRequest instanceof CrdtRequest.Upload) {
            return handleUpload(messaging, (CrdtRequest.Upload) crdtRequest);
        }
        if (crdtRequest instanceof CrdtRequest.Remove) {
            return handleRemove(messaging, (CrdtRequest.Remove) crdtRequest);
        }
        if (crdtRequest instanceof CrdtRequest.Ping) {
            return handlePing(messaging, (CrdtRequest.Ping) crdtRequest);
        }
        if (crdtRequest instanceof CrdtRequest.Take) {
            return handleTake(messaging, (CrdtRequest.Take) crdtRequest);
        }
        if (crdtRequest instanceof CrdtRequest.Handshake) {
            return Promise.ofException(new CrdtException("Handshake was already performed"));
        }
        throw new AssertionError();
    }

    private Promise<Void> handleHandshake(IMessaging<CrdtRequest, CrdtResponse> iMessaging, CrdtRequest.Handshake handshake) {
        return iMessaging.send(this.handshakeHandler.apply(handshake)).whenComplete(this.handshakePromise.recordStats()).whenComplete(LogUtils.toLogger(this.logger, LogUtils.Level.TRACE, LogUtils.thisMethod(), new Object[]{iMessaging, handshake, this}));
    }

    private Promise<Void> handleTake(Messaging<CrdtRequest, CrdtResponse> messaging, CrdtRequest.Take take) {
        return this.storage.take().whenComplete(this.takeBeginPromise.recordStats()).whenResult(() -> {
            messaging.send(new CrdtResponse.TakeStarted());
        }).then(streamSupplier -> {
            return ((ChannelSupplier) ((StreamSupplier) ((StreamSupplier) streamSupplier.transformWith(Utils.ackTransformer(promise -> {
                Objects.requireNonNull(messaging);
                return promise.then(messaging::receive).thenCallback((crdtRequest, settableCallback) -> {
                    if (crdtRequest instanceof CrdtRequest.TakeAck) {
                        settableCallback.set((Object) null);
                    } else {
                        settableCallback.setException(new CrdtException("Received message " + crdtRequest + " instead of " + CrdtRequest.TakeAck.class));
                    }
                });
            }))).transformWith(this.detailedStats ? this.takeStatsDetailed : this.takeStats)).transformWith(Utils.createSerializer(this.serializer))).streamTo(messaging.sendBinaryStream());
        }).whenComplete(this.takeFinishedPromise.recordStats()).whenComplete(LogUtils.toLogger(this.logger, LogUtils.Level.TRACE, LogUtils.thisMethod(), new Object[]{messaging, take, this}));
    }

    private Promise<Void> handlePing(Messaging<CrdtRequest, CrdtResponse> messaging, CrdtRequest.Ping ping) {
        Promise send = messaging.send(new CrdtResponse.Pong());
        Objects.requireNonNull(messaging);
        Promise then = send.then(messaging::sendEndOfStream);
        Objects.requireNonNull(messaging);
        return then.whenResult(messaging::close).whenComplete(this.pingPromise.recordStats()).whenComplete(LogUtils.toLogger(this.logger, LogUtils.Level.TRACE, LogUtils.thisMethod(), new Object[]{messaging, ping, this}));
    }

    private Promise<Void> handleRemove(Messaging<CrdtRequest, CrdtResponse> messaging, CrdtRequest.Remove remove) {
        Promise then = ((StreamSupplier) messaging.receiveBinaryStream().transformWith(Utils.createDeserializer(this.tombstoneSerializer))).streamTo(StreamConsumers.ofPromise(this.storage.remove().map(streamConsumer -> {
            return (StreamConsumer) streamConsumer.transformWith(this.detailedStats ? this.removeStatsDetailed : this.removeStats);
        }).whenComplete(this.removeBeginPromise.recordStats()))).then(() -> {
            return messaging.send(new CrdtResponse.RemoveAck());
        });
        Objects.requireNonNull(messaging);
        Promise then2 = then.then(messaging::sendEndOfStream);
        Objects.requireNonNull(messaging);
        return then2.whenResult(messaging::close).whenComplete(this.removeFinishedPromise.recordStats()).whenComplete(LogUtils.toLogger(this.logger, LogUtils.Level.TRACE, LogUtils.thisMethod(), new Object[]{messaging, remove, this}));
    }

    private Promise<Void> handleUpload(Messaging<CrdtRequest, CrdtResponse> messaging, CrdtRequest.Upload upload) {
        Promise then = ((StreamSupplier) messaging.receiveBinaryStream().transformWith(Utils.createDeserializer(this.serializer))).streamTo(StreamConsumers.ofPromise(this.storage.upload().map(streamConsumer -> {
            return (StreamConsumer) streamConsumer.transformWith(this.detailedStats ? this.uploadStatsDetailed : this.uploadStats);
        }).whenComplete(this.uploadBeginPromise.recordStats()))).then(() -> {
            return messaging.send(new CrdtResponse.UploadAck());
        });
        Objects.requireNonNull(messaging);
        Promise then2 = then.then(messaging::sendEndOfStream);
        Objects.requireNonNull(messaging);
        return then2.whenResult(messaging::close).whenComplete(this.uploadFinishedPromise.recordStats()).whenComplete(LogUtils.toLogger(this.logger, LogUtils.Level.TRACE, LogUtils.thisMethod(), new Object[]{messaging, upload, this}));
    }

    private Promise<Void> handleDownload(Messaging<CrdtRequest, CrdtResponse> messaging, CrdtRequest.Download download) {
        return this.storage.download(download.token()).map(streamSupplier -> {
            return (StreamSupplier) streamSupplier.transformWith(this.detailedStats ? this.downloadStatsDetailed : this.downloadStats);
        }).whenComplete(this.downloadBeginPromise.recordStats()).whenResult(() -> {
            messaging.send(new CrdtResponse.DownloadStarted());
        }).then(streamSupplier2 -> {
            return ((ChannelSupplier) streamSupplier2.transformWith(Utils.createSerializer(this.serializer))).streamTo(messaging.sendBinaryStream());
        }).whenComplete(this.downloadFinishedPromise.recordStats()).whenComplete(LogUtils.toLogger(this.logger, LogUtils.Level.TRACE, LogUtils.thisMethod(), new Object[]{messaging, download, this}));
    }

    @JmxAttribute
    public boolean isDetailedStats() {
        return this.detailedStats;
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailedStats = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailedStats = false;
    }

    @JmxAttribute
    public BasicStreamStats getUploadStats() {
        return this.uploadStats;
    }

    @JmxAttribute
    public DetailedStreamStats getUploadStatsDetailed() {
        return this.uploadStatsDetailed;
    }

    @JmxAttribute
    public BasicStreamStats getDownloadStats() {
        return this.downloadStats;
    }

    @JmxAttribute
    public DetailedStreamStats getDownloadStatsDetailed() {
        return this.downloadStatsDetailed;
    }

    @JmxAttribute
    public BasicStreamStats getTakeStats() {
        return this.takeStats;
    }

    @JmxAttribute
    public DetailedStreamStats getTakeStatsDetailed() {
        return this.takeStatsDetailed;
    }

    @JmxAttribute
    public BasicStreamStats getRemoveStats() {
        return this.removeStats;
    }

    @JmxAttribute
    public DetailedStreamStats getRemoveStatsDetailed() {
        return this.removeStatsDetailed;
    }

    @JmxAttribute
    public PromiseStats getHandshakePromise() {
        return this.handshakePromise;
    }

    @JmxAttribute
    public PromiseStats getDownloadBeginPromise() {
        return this.downloadBeginPromise;
    }

    @JmxAttribute
    public PromiseStats getDownloadFinishedPromise() {
        return this.downloadFinishedPromise;
    }

    @JmxAttribute
    public PromiseStats getUploadBeginPromise() {
        return this.uploadBeginPromise;
    }

    @JmxAttribute
    public PromiseStats getUploadFinishedPromise() {
        return this.uploadFinishedPromise;
    }

    @JmxAttribute
    public PromiseStats getRemoveBeginPromise() {
        return this.removeBeginPromise;
    }

    @JmxAttribute
    public PromiseStats getRemoveFinishedPromise() {
        return this.removeFinishedPromise;
    }

    @JmxAttribute
    public PromiseStats getTakeBeginPromise() {
        return this.takeBeginPromise;
    }

    @JmxAttribute
    public PromiseStats getTakeFinishedPromise() {
        return this.takeFinishedPromise;
    }

    @JmxAttribute
    public PromiseStats getPingPromise() {
        return this.pingPromise;
    }
}
