/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.datagraph.server;

import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.datagraph.graph.StreamId;
import io.datakernel.datagraph.graph.TaskContext;
import io.datakernel.datagraph.node.Node;
import io.datakernel.datagraph.server.DatagraphEnvironment;
import io.datakernel.datagraph.server.DatagraphSerialization;
import io.datakernel.datagraph.server.command.DatagraphCommand;
import io.datakernel.datagraph.server.command.DatagraphCommandDownload;
import io.datakernel.datagraph.server.command.DatagraphCommandExecute;
import io.datakernel.datagraph.server.command.DatagraphResponse;
import io.datakernel.eventloop.AbstractServer;
import io.datakernel.eventloop.AsyncTcpSocket;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.serializer.BufferSerializer;
import io.datakernel.stream.DataStreams;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamConsumerWithResult;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.net.MessagingSerializer;
import io.datakernel.stream.net.MessagingSerializers;
import io.datakernel.stream.net.MessagingWithBinaryStreaming;
import io.datakernel.stream.processor.StreamBinarySerializer;
import io.datakernel.stream.processor.StreamLateBinder;
import io.datakernel.util.MemSize;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

public final class DatagraphServer
extends AbstractServer<DatagraphServer> {
    private final DatagraphEnvironment environment;
    private final Map<StreamId, StreamLateBinder<ByteBuf>> pendingStreams = new HashMap<StreamId, StreamLateBinder<ByteBuf>>();
    private final MessagingSerializer<DatagraphCommand, DatagraphResponse> serializer;
    private final Map<Class, CommandHandler> handlers = new HashMap<Class, CommandHandler>();

    public DatagraphServer(Eventloop eventloop, DatagraphEnvironment environment) {
        super(eventloop);
        this.handlers.put(DatagraphCommandDownload.class, new DownloadCommandHandler());
        this.handlers.put(DatagraphCommandExecute.class, new ExecuteCommandHandler());
        this.environment = DatagraphEnvironment.extend(environment).set(DatagraphServer.class, (Object)this);
        DatagraphSerialization serialization = environment.getInstance(DatagraphSerialization.class);
        this.serializer = MessagingSerializers.ofJson(serialization.commandAdapter, serialization.responseAdapter);
    }

    public <T> StreamConsumer<T> upload(StreamId streamId, Class<T> type) {
        BufferSerializer<T> serializer = this.environment.getInstance(DatagraphSerialization.class).getSerializer(type);
        StreamBinarySerializer streamSerializer = StreamBinarySerializer.create(serializer).withInitialBufferSize(MemSize.kilobytes((long)256L)).withAutoFlushInterval(Duration.ofSeconds(1L));
        StreamLateBinder forwarder = this.pendingStreams.remove(streamId);
        if (forwarder == null) {
            this.logger.info("onUpload: waiting {}, pending downloads: {}", (Object)streamId, (Object)this.pendingStreams.size());
            forwarder = StreamLateBinder.create();
            this.pendingStreams.put(streamId, (StreamLateBinder<ByteBuf>)forwarder);
        } else {
            this.logger.info("onUpload: transferring {}, pending downloads: {}", (Object)streamId, (Object)this.pendingStreams.size());
        }
        DataStreams.bind((StreamProducer)streamSerializer.getOutput(), (StreamConsumer)forwarder.getInput());
        return streamSerializer.getInput();
    }

    protected final AsyncTcpSocket.EventHandler createSocketHandler(AsyncTcpSocket asyncTcpSocket) {
        MessagingWithBinaryStreaming messaging = MessagingWithBinaryStreaming.create((AsyncTcpSocket)asyncTcpSocket, this.serializer);
        messaging.receive().whenResult(msg -> {
            if (msg != null) {
                this.doRead((MessagingWithBinaryStreaming<DatagraphCommand, DatagraphResponse>)messaging, (DatagraphCommand)msg);
            } else {
                this.logger.warn("unexpected end of stream");
                messaging.close();
            }
        }).whenException(e -> {
            this.logger.error("received error while trying to read", e);
            messaging.close();
        });
        return messaging;
    }

    private void doRead(MessagingWithBinaryStreaming<DatagraphCommand, DatagraphResponse> messaging, DatagraphCommand command) {
        CommandHandler handler = this.handlers.get(command.getClass());
        if (handler == null) {
            messaging.close();
            this.logger.error("missing handler for " + command);
        } else {
            handler.onCommand(messaging, command);
        }
    }

    private class ExecuteCommandHandler
    implements CommandHandler<DatagraphCommandExecute, DatagraphResponse> {
        private ExecuteCommandHandler() {
        }

        @Override
        public void onCommand(MessagingWithBinaryStreaming<DatagraphCommandExecute, DatagraphResponse> messaging, DatagraphCommandExecute command) {
            messaging.close();
            TaskContext taskContext = new TaskContext(DatagraphServer.this.eventloop, DatagraphEnvironment.extend(DatagraphServer.this.environment));
            for (Node node : command.getNodes()) {
                node.createAndBind(taskContext);
            }
            taskContext.wireAll();
        }
    }

    private class DownloadCommandHandler
    implements CommandHandler<DatagraphCommandDownload, DatagraphResponse> {
        private DownloadCommandHandler() {
        }

        @Override
        public void onCommand(MessagingWithBinaryStreaming<DatagraphCommandDownload, DatagraphResponse> messaging, DatagraphCommandDownload command) {
            StreamId streamId = command.getStreamId();
            StreamLateBinder forwarder = (StreamLateBinder)DatagraphServer.this.pendingStreams.remove(streamId);
            if (forwarder != null) {
                DatagraphServer.this.logger.info("onDownload: transferring {}, pending downloads: {}", (Object)streamId, (Object)DatagraphServer.this.pendingStreams.size());
            } else {
                DatagraphServer.this.logger.info("onDownload: waiting {}, pending downloads: {}", (Object)streamId, (Object)DatagraphServer.this.pendingStreams.size());
                forwarder = StreamLateBinder.create();
                DatagraphServer.this.pendingStreams.put(streamId, forwarder);
            }
            StreamConsumerWithResult consumer = messaging.sendBinaryStream();
            DataStreams.bind((StreamProducer)forwarder.getOutput(), (StreamConsumer)consumer);
            consumer.getResult().whenComplete(($, throwable) -> {
                if (throwable != null) {
                    DatagraphServer.this.logger.warn("Exception occurred while trying to send data");
                }
                messaging.close();
            });
        }
    }

    protected static interface CommandHandler<I, O> {
        public void onCommand(MessagingWithBinaryStreaming<I, O> var1, I var2);
    }
}

