package io.datakernel.datagraph.server;

import io.datakernel.async.Stage;
import io.datakernel.datagraph.graph.StreamId;
import io.datakernel.datagraph.node.Node;
import io.datakernel.datagraph.server.command.DatagraphCommand;
import io.datakernel.datagraph.server.command.DatagraphCommandDownload;
import io.datakernel.datagraph.server.command.DatagraphCommandExecute;
import io.datakernel.datagraph.server.command.DatagraphResponse;
import io.datakernel.eventloop.AsyncTcpSocketImpl;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.net.SocketSettings;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamProducerWithResult;
import io.datakernel.stream.net.MessagingSerializer;
import io.datakernel.stream.net.MessagingSerializers;
import io.datakernel.stream.net.MessagingWithBinaryStreaming;
import io.datakernel.stream.processor.StreamBinaryDeserializer;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/datagraph/server/DatagraphClient.class */
public final class DatagraphClient {
    private final Eventloop eventloop;
    private final DatagraphSerialization serialization;
    private final MessagingSerializer<DatagraphResponse, DatagraphCommand> serializer;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final SocketSettings socketSettings = SocketSettings.create();

    public DatagraphClient(Eventloop eventloop, DatagraphSerialization datagraphSerialization) {
        this.eventloop = eventloop;
        this.serialization = datagraphSerialization;
        this.serializer = MessagingSerializers.ofJson(datagraphSerialization.responseAdapter, datagraphSerialization.commandAdapter);
    }

    public <T> Stage<StreamProducer<T>> download(InetSocketAddress inetSocketAddress, StreamId streamId, Class<T> cls) {
        return this.eventloop.connect(inetSocketAddress).thenCompose(socketChannel -> {
            AsyncTcpSocketImpl wrapChannel = AsyncTcpSocketImpl.wrapChannel(this.eventloop, socketChannel, this.socketSettings);
            MessagingWithBinaryStreaming create = MessagingWithBinaryStreaming.create(wrapChannel, this.serializer);
            DatagraphCommandDownload datagraphCommandDownload = new DatagraphCommandDownload(streamId);
            wrapChannel.setEventHandler(create);
            wrapChannel.register();
            return create.send(datagraphCommandDownload).thenApply(r7 -> {
                StreamProducerWithResult with = create.receiveBinaryStream().with(StreamBinaryDeserializer.create(this.serialization.getSerializer(cls)));
                create.getClass();
                return with.thenRunEx(create::close).withLateBinding();
            });
        });
    }

    public Stage<Void> execute(InetSocketAddress inetSocketAddress, Collection<Node> collection) {
        return this.eventloop.connect(inetSocketAddress).thenCompose(socketChannel -> {
            AsyncTcpSocketImpl wrapChannel = AsyncTcpSocketImpl.wrapChannel(this.eventloop, socketChannel, this.socketSettings);
            MessagingWithBinaryStreaming create = MessagingWithBinaryStreaming.create(wrapChannel, this.serializer);
            wrapChannel.setEventHandler(create);
            wrapChannel.register();
            Stage send = create.send(new DatagraphCommandExecute(new ArrayList(collection)));
            create.getClass();
            return send.thenRun(create::sendEndOfStream);
        });
    }
}
