/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.datagraph.server;

import io.datakernel.async.Stage;
import io.datakernel.datagraph.graph.StreamId;
import io.datakernel.datagraph.node.Node;
import io.datakernel.datagraph.server.DatagraphSerialization;
import io.datakernel.datagraph.server.command.DatagraphCommand;
import io.datakernel.datagraph.server.command.DatagraphCommandDownload;
import io.datakernel.datagraph.server.command.DatagraphCommandExecute;
import io.datakernel.datagraph.server.command.DatagraphResponse;
import io.datakernel.eventloop.AsyncTcpSocket;
import io.datakernel.eventloop.AsyncTcpSocketImpl;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.net.SocketSettings;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamProducerModifier;
import io.datakernel.stream.net.MessagingSerializer;
import io.datakernel.stream.net.MessagingSerializers;
import io.datakernel.stream.net.MessagingWithBinaryStreaming;
import io.datakernel.stream.processor.StreamBinaryDeserializer;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class DatagraphClient {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final Eventloop eventloop;
    private final DatagraphSerialization serialization;
    private final MessagingSerializer<DatagraphResponse, DatagraphCommand> serializer;
    private final SocketSettings socketSettings = SocketSettings.create();

    public DatagraphClient(Eventloop eventloop, DatagraphSerialization serialization) {
        this.eventloop = eventloop;
        this.serialization = serialization;
        this.serializer = MessagingSerializers.ofJson(serialization.responseAdapter, serialization.commandAdapter);
    }

    public <T> Stage<StreamProducer<T>> download(InetSocketAddress address, StreamId streamId, Class<T> type) {
        return this.eventloop.connect((SocketAddress)address).thenCompose(socketChannel -> {
            AsyncTcpSocketImpl asyncTcpSocket = AsyncTcpSocketImpl.wrapChannel((Eventloop)this.eventloop, (SocketChannel)socketChannel, (SocketSettings)this.socketSettings);
            MessagingWithBinaryStreaming messaging = MessagingWithBinaryStreaming.create((AsyncTcpSocket)asyncTcpSocket, this.serializer);
            DatagraphCommandDownload commandDownload = new DatagraphCommandDownload(streamId);
            asyncTcpSocket.setEventHandler((AsyncTcpSocket.EventHandler)messaging);
            asyncTcpSocket.register();
            return messaging.send((Object)commandDownload).thenApply($ -> messaging.receiveBinaryStream().with((StreamProducerModifier)StreamBinaryDeserializer.create(this.serialization.getSerializer(type))).thenRunEx(() -> ((MessagingWithBinaryStreaming)messaging).close()).withLateBinding());
        });
    }

    public Stage<Void> execute(InetSocketAddress address, Collection<Node> nodes) {
        return this.eventloop.connect((SocketAddress)address).thenCompose(socketChannel -> {
            AsyncTcpSocketImpl asyncTcpSocket = AsyncTcpSocketImpl.wrapChannel((Eventloop)this.eventloop, (SocketChannel)socketChannel, (SocketSettings)this.socketSettings);
            MessagingWithBinaryStreaming messaging = MessagingWithBinaryStreaming.create((AsyncTcpSocket)asyncTcpSocket, this.serializer);
            asyncTcpSocket.setEventHandler((AsyncTcpSocket.EventHandler)messaging);
            asyncTcpSocket.register();
            DatagraphCommandExecute commandExecute = new DatagraphCommandExecute(new ArrayList<Node>(nodes));
            return messaging.send((Object)commandExecute).thenRun(() -> ((MessagingWithBinaryStreaming)messaging).sendEndOfStream());
        });
    }
}

