package io.datakernel.crdt;

import io.datakernel.async.service.EventloopService;
import io.datakernel.common.exception.StacklessException;
import io.datakernel.crdt.CrdtMessaging;
import io.datakernel.csp.ChannelConsumer;
import io.datakernel.csp.ChannelSupplier;
import io.datakernel.csp.binary.ByteBufSerializer;
import io.datakernel.csp.net.MessagingWithBinaryStreaming;
import io.datakernel.datastream.StreamConsumer;
import io.datakernel.datastream.StreamSupplier;
import io.datakernel.datastream.csp.ChannelDeserializer;
import io.datakernel.datastream.csp.ChannelSerializer;
import io.datakernel.datastream.stats.StreamStats;
import io.datakernel.datastream.stats.StreamStatsBasic;
import io.datakernel.datastream.stats.StreamStatsDetailed;
import io.datakernel.eventloop.ConnectCallback;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.eventloop.jmx.EventloopJmxMBeanEx;
import io.datakernel.eventloop.net.SocketSettings;
import io.datakernel.jmx.api.JmxAttribute;
import io.datakernel.jmx.api.JmxOperation;
import io.datakernel.net.AsyncTcpSocketImpl;
import io.datakernel.promise.Promise;
import io.datakernel.serializer.BinarySerializer;
import java.lang.Comparable;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/datakernel/crdt/CrdtStorageClient.class */
public final class CrdtStorageClient<K extends Comparable<K>, S> implements CrdtStorage<K, S>, EventloopService, EventloopJmxMBeanEx {
    private final Eventloop eventloop;
    private final InetSocketAddress address;
    private final CrdtDataSerializer<K, S> serializer;
    private final BinarySerializer<K> keySerializer;
    private boolean detailedStats;
    private SocketSettings socketSettings = SocketSettings.create();
    private final StreamStatsBasic<CrdtData<K, S>> uploadStats = StreamStats.basic();
    private final StreamStatsDetailed<CrdtData<K, S>> uploadStatsDetailed = StreamStats.detailed();
    private final StreamStatsBasic<CrdtData<K, S>> downloadStats = StreamStats.basic();
    private final StreamStatsDetailed<CrdtData<K, S>> downloadStatsDetailed = StreamStats.detailed();
    private final StreamStatsBasic<K> removeStats = StreamStats.basic();
    private final StreamStatsDetailed<K> removeStatsDetailed = StreamStats.detailed();

    private CrdtStorageClient(Eventloop eventloop, InetSocketAddress inetSocketAddress, CrdtDataSerializer<K, S> crdtDataSerializer) {
        this.eventloop = eventloop;
        this.address = inetSocketAddress;
        this.serializer = crdtDataSerializer;
        this.keySerializer = crdtDataSerializer.getKeySerializer();
    }

    public static <K extends Comparable<K>, S> CrdtStorageClient<K, S> create(Eventloop eventloop, InetSocketAddress inetSocketAddress, CrdtDataSerializer<K, S> crdtDataSerializer) {
        return new CrdtStorageClient<>(eventloop, inetSocketAddress, crdtDataSerializer);
    }

    public static <K extends Comparable<K>, S> CrdtStorageClient<K, S> create(Eventloop eventloop, InetSocketAddress inetSocketAddress, BinarySerializer<K> binarySerializer, BinarySerializer<S> binarySerializer2) {
        return new CrdtStorageClient<>(eventloop, inetSocketAddress, new CrdtDataSerializer(binarySerializer, binarySerializer2));
    }

    public CrdtStorageClient<K, S> withSocketSettings(SocketSettings socketSettings) {
        this.socketSettings = socketSettings;
        return this;
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @Override // io.datakernel.crdt.CrdtStorage
    public Promise<StreamConsumer<CrdtData<K, S>>> upload() {
        return connect().then(messagingWithBinaryStreaming -> {
            return messagingWithBinaryStreaming.send(CrdtMessaging.CrdtMessages.UPLOAD).map(r6 -> {
                ChannelConsumer withAcknowledgement = messagingWithBinaryStreaming.sendBinaryStream().withAcknowledgement(promise -> {
                    return promise.then(r3 -> {
                        return messagingWithBinaryStreaming.receive();
                    }).then(simpleHandler(CrdtMessaging.CrdtResponses.UPLOAD_FINISHED));
                });
                return StreamConsumer.ofSupplier(streamSupplier -> {
                    return ((ChannelSupplier) ((StreamSupplier) streamSupplier.transformWith(this.detailedStats ? this.uploadStats : this.uploadStatsDetailed)).transformWith(ChannelSerializer.create(this.serializer))).streamTo(withAcknowledgement);
                }).withLateBinding();
            });
        });
    }

    @Override // io.datakernel.crdt.CrdtStorage
    public Promise<StreamSupplier<CrdtData<K, S>>> download(long j) {
        return connect().then(messagingWithBinaryStreaming -> {
            return messagingWithBinaryStreaming.send(new CrdtMessaging.Download(j)).then(r3 -> {
                return messagingWithBinaryStreaming.receive();
            }).then(crdtResponse -> {
                return crdtResponse == null ? Promise.ofException(new IllegalStateException("Unexpected end of stream")) : crdtResponse.getClass() == CrdtMessaging.DownloadStarted.class ? Promise.complete() : crdtResponse instanceof CrdtMessaging.ServerError ? Promise.ofException(new StacklessException(CrdtStorageClient.class, ((CrdtMessaging.ServerError) crdtResponse).getMsg())) : Promise.ofException(new IllegalStateException("Received message " + crdtResponse + " instead of " + CrdtMessaging.DownloadStarted.class.getSimpleName()));
            }).map(r5 -> {
                return ((StreamSupplier) ((StreamSupplier) messagingWithBinaryStreaming.receiveBinaryStream().transformWith(ChannelDeserializer.create(this.serializer))).transformWith(this.detailedStats ? this.downloadStats : this.downloadStatsDetailed)).withEndOfStream(promise -> {
                    return promise.then(r32 -> {
                        return messagingWithBinaryStreaming.sendEndOfStream();
                    }).whenResult(r33 -> {
                        messagingWithBinaryStreaming.close();
                    });
                }).withLateBinding();
            });
        });
    }

    @Override // io.datakernel.crdt.CrdtStorage
    public Promise<StreamConsumer<K>> remove() {
        return connect().then(messagingWithBinaryStreaming -> {
            return messagingWithBinaryStreaming.send(CrdtMessaging.CrdtMessages.REMOVE).map(r6 -> {
                ChannelConsumer withAcknowledgement = messagingWithBinaryStreaming.sendBinaryStream().withAcknowledgement(promise -> {
                    return promise.then(r3 -> {
                        return messagingWithBinaryStreaming.receive();
                    }).then(simpleHandler(CrdtMessaging.CrdtResponses.REMOVE_FINISHED));
                });
                return StreamConsumer.ofSupplier(streamSupplier -> {
                    return ((ChannelSupplier) ((StreamSupplier) streamSupplier.transformWith(this.detailedStats ? this.removeStats : this.removeStatsDetailed)).transformWith(ChannelSerializer.create(this.keySerializer))).streamTo(withAcknowledgement);
                }).withLateBinding();
            });
        });
    }

    @Override // io.datakernel.crdt.CrdtStorage
    public Promise<Void> ping() {
        return connect().then(messagingWithBinaryStreaming -> {
            return messagingWithBinaryStreaming.send(CrdtMessaging.CrdtMessages.PING).then(r3 -> {
                return messagingWithBinaryStreaming.receive();
            }).then(simpleHandler(CrdtMessaging.CrdtResponses.PONG));
        });
    }

    @NotNull
    public Promise<Void> start() {
        return Promise.complete();
    }

    @NotNull
    public Promise<Void> stop() {
        return Promise.complete();
    }

    private Function<CrdtMessaging.CrdtResponse, Promise<Void>> simpleHandler(CrdtMessaging.CrdtResponse crdtResponse) {
        return crdtResponse2 -> {
            return crdtResponse2 == null ? Promise.ofException(new IllegalStateException("Unexpected end of stream")) : crdtResponse2 == crdtResponse ? Promise.complete() : crdtResponse2 instanceof CrdtMessaging.ServerError ? Promise.ofException(new StacklessException(CrdtStorageClient.class, ((CrdtMessaging.ServerError) crdtResponse2).getMsg())) : Promise.ofException(new IllegalStateException("Received message " + crdtResponse2 + " instead of " + crdtResponse));
        };
    }

    private Promise<MessagingWithBinaryStreaming<CrdtMessaging.CrdtResponse, CrdtMessaging.CrdtMessage>> connect() {
        return Promise.ofCallback(settablePromise -> {
            this.eventloop.connect(this.address, new ConnectCallback() { // from class: io.datakernel.crdt.CrdtStorageClient.1
                public void onConnect(@NotNull SocketChannel socketChannel) {
                    settablePromise.set(MessagingWithBinaryStreaming.create(AsyncTcpSocketImpl.wrapChannel(CrdtStorageClient.this.eventloop, socketChannel, CrdtStorageClient.this.socketSettings), ByteBufSerializer.ofJsonCodec(CrdtMessaging.RESPONSE_CODEC, CrdtMessaging.MESSAGE_CODEC)));
                }

                public void onException(@NotNull Throwable th) {
                    settablePromise.setException(th);
                }
            });
        });
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailedStats = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailedStats = false;
    }

    @JmxAttribute
    public StreamStatsBasic getUploadStats() {
        return this.uploadStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getUploadStatsDetailed() {
        return this.uploadStatsDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getDownloadStats() {
        return this.downloadStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getDownloadStatsDetailed() {
        return this.downloadStatsDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getRemoveStats() {
        return this.removeStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getRemoveStatsDetailed() {
        return this.removeStatsDetailed;
    }
}
