/*
 * Decompiled with CFR 0.152.
 */
package io.activej.dataflow;

import io.activej.async.process.AsyncCloseable;
import io.activej.bytebuf.ByteBuf;
import io.activej.common.ApplicationSettings;
import io.activej.common.MemSize;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.binary.ByteBufsCodec;
import io.activej.csp.dsl.ChannelTransformer;
import io.activej.csp.net.Messaging;
import io.activej.csp.net.MessagingWithBinaryStreaming;
import io.activej.csp.queue.ChannelQueue;
import io.activej.csp.queue.ChannelZeroBuffer;
import io.activej.dataflow.command.DataflowCommand;
import io.activej.dataflow.command.DataflowCommandDownload;
import io.activej.dataflow.command.DataflowCommandExecute;
import io.activej.dataflow.command.DataflowCommandGetTasks;
import io.activej.dataflow.command.DataflowResponse;
import io.activej.dataflow.command.DataflowResponsePartitionData;
import io.activej.dataflow.command.DataflowResponseResult;
import io.activej.dataflow.command.DataflowResponseTaskData;
import io.activej.dataflow.graph.StreamId;
import io.activej.dataflow.graph.Task;
import io.activej.dataflow.inject.BinarySerializerModule;
import io.activej.dataflow.node.Node;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.eventloop.Eventloop;
import io.activej.inject.ResourceLocator;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.net.AbstractServer;
import io.activej.net.socket.tcp.AsyncTcpSocket;
import io.activej.promise.SettablePromise;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.jetbrains.annotations.Nullable;

public final class DataflowServer
extends AbstractServer<DataflowServer> {
    private static final int MAX_LAST_RAN_TASKS = ApplicationSettings.getInt(DataflowServer.class, (String)"maxLastRanTasks", (int)1000);
    private final Map<StreamId, ChannelQueue<ByteBuf>> pendingStreams = new HashMap<StreamId, ChannelQueue<ByteBuf>>();
    private final Map<Class, BiConsumer<Messaging<DataflowCommand, DataflowResponse>, ?>> handlers = new HashMap();
    private final ByteBufsCodec<DataflowCommand, DataflowResponse> codec;
    private final BinarySerializerModule.BinarySerializerLocator serializers;
    private final Map<Long, Task> runningTasks = new HashMap<Long, Task>();
    private final Map<Long, Task> lastTasks = new LinkedHashMap<Long, Task>(){

        @Override
        protected boolean removeEldestEntry(Map.Entry eldest) {
            return this.size() > MAX_LAST_RAN_TASKS;
        }
    };
    private int succeededTasks = 0;
    private int canceledTasks = 0;
    private int failedTasks = 0;

    private <T> void handleCommand(Class<T> cls, BiConsumer<Messaging<DataflowCommand, DataflowResponse>, T> handler) {
        this.handlers.put(cls, handler);
    }

    public DataflowServer(Eventloop eventloop, ByteBufsCodec<DataflowCommand, DataflowResponse> codec, BinarySerializerModule.BinarySerializerLocator serializers, ResourceLocator environment) {
        super(eventloop);
        this.codec = codec;
        this.serializers = serializers;
        this.handleCommand(DataflowCommandDownload.class, (messaging, command) -> {
            StreamId streamId;
            ChannelZeroBuffer forwarder;
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Processing onDownload: {}, {}", command, messaging);
            }
            if ((forwarder = this.pendingStreams.remove(streamId = command.getStreamId())) != null) {
                this.logger.info("onDownload: transferring {}, pending downloads: {}", (Object)streamId, (Object)this.pendingStreams.size());
            } else {
                forwarder = new ChannelZeroBuffer();
                this.pendingStreams.put(streamId, (ChannelQueue<ByteBuf>)forwarder);
                this.logger.info("onDownload: waiting {}, pending downloads: {}", (Object)streamId, (Object)this.pendingStreams.size());
                messaging.receive().whenException(() -> {
                    ChannelQueue<ByteBuf> removed = this.pendingStreams.remove(streamId);
                    if (removed != null) {
                        this.logger.info("onDownload: removing {}, pending downloads: {}", (Object)streamId, (Object)this.pendingStreams.size());
                    }
                });
            }
            ChannelConsumer consumer = messaging.sendBinaryStream();
            forwarder.getSupplier().streamTo(consumer);
            consumer.withAcknowledgement(ack -> ack.whenComplete(($, e) -> {
                if (e != null) {
                    this.logger.warn("Exception occurred while trying to send data", e);
                }
                messaging.close();
            }));
        });
        this.handleCommand(DataflowCommandExecute.class, (messaging, command) -> {
            long taskId = command.getTaskId();
            Task task = new Task(taskId, environment, command.getNodes());
            try {
                task.bind();
            }
            catch (Exception e) {
                this.logger.error("Failed to construct task: {}", command, (Object)e);
                this.sendResponse((Messaging<DataflowCommand, DataflowResponse>)messaging, e);
                return;
            }
            this.lastTasks.put(taskId, task);
            this.runningTasks.put(taskId, task);
            task.execute().whenComplete(($, throwable) -> {
                this.runningTasks.remove(taskId);
                if (throwable == null) {
                    ++this.succeededTasks;
                    this.logger.info("Task executed successfully: {}", command);
                } else if (throwable == AsyncCloseable.CLOSE_EXCEPTION) {
                    ++this.canceledTasks;
                    this.logger.error("Canceled task: {}", command, (Object)throwable);
                } else {
                    ++this.failedTasks;
                    this.logger.error("Failed to execute task: {}", command, (Object)throwable);
                }
                this.sendResponse((Messaging<DataflowCommand, DataflowResponse>)messaging, throwable);
            });
            messaging.receive().whenException(() -> {
                if (!task.isExecuted()) {
                    this.logger.error("Client disconnected. Canceling task: {}", command);
                    task.cancel();
                }
            });
        });
        this.handleCommand(DataflowCommandGetTasks.class, (messaging, command) -> {
            String err;
            Long taskId = command.getTaskId();
            if (taskId == null) {
                messaging.send((Object)new DataflowResponsePartitionData(this.runningTasks.size(), this.succeededTasks, this.failedTasks, this.canceledTasks, this.lastTasks.entrySet().stream().map(e -> new DataflowResponsePartitionData.TaskDesc((Long)e.getKey(), ((Task)e.getValue()).getStatus())).collect(Collectors.toList()))).whenException(e -> this.logger.error("Failed to send answer for the partition data request", e));
                return;
            }
            Task task = this.lastTasks.get(taskId);
            if (task == null) {
                messaging.send((Object)new DataflowResponseResult("No task found with id " + taskId));
                return;
            }
            if (task.getError() != null) {
                StringWriter writer = new StringWriter();
                task.getError().printStackTrace(new PrintWriter(writer));
                err = writer.toString();
            } else {
                err = null;
            }
            messaging.send((Object)new DataflowResponseTaskData(task.getStatus(), task.getStartTime(), task.getFinishTime(), err, task.getNodes().stream().filter(n -> n.getStats() != null).collect(Collectors.toMap(Node::getIndex, Node::getStats)), task.getGraphViz())).whenException(e -> this.logger.error("Failed to send answer for the task (" + taskId + ") data request", e));
        });
    }

    private void sendResponse(Messaging<DataflowCommand, DataflowResponse> messaging, @Nullable Throwable throwable) {
        String error = null;
        if (throwable != null) {
            error = throwable.getClass().getSimpleName() + ": " + throwable.getMessage();
        }
        messaging.send((Object)new DataflowResponseResult(error)).whenComplete(() -> messaging.close());
    }

    public <T> StreamConsumer<T> upload(StreamId streamId, Class<T> type, ChannelTransformer<ByteBuf, ByteBuf> transformer) {
        ChannelSerializer streamSerializer = ChannelSerializer.create(this.serializers.get(type)).withInitialBufferSize(MemSize.kilobytes((long)256L)).withAutoFlushInterval(Duration.ZERO).withExplicitEndOfStream();
        ChannelZeroBuffer forwarder = this.pendingStreams.remove(streamId);
        if (forwarder == null) {
            forwarder = new ChannelZeroBuffer();
            this.pendingStreams.put(streamId, (ChannelQueue<ByteBuf>)forwarder);
            this.logger.info("onUpload: waiting {}, pending downloads: {}", (Object)streamId, (Object)this.pendingStreams.size());
        } else {
            this.logger.info("onUpload: transferring {}, pending downloads: {}", (Object)streamId, (Object)this.pendingStreams.size());
        }
        streamSerializer.getOutput().set((ChannelConsumer)forwarder.getConsumer().transformWith(transformer));
        streamSerializer.getAcknowledgement().whenException(() -> {
            ChannelQueue<ByteBuf> removed = this.pendingStreams.remove(streamId);
            if (removed != null) {
                this.logger.info("onUpload: removing {}, pending downloads: {}", (Object)streamId, (Object)this.pendingStreams.size());
                removed.close();
            }
        });
        return streamSerializer;
    }

    public <T> StreamConsumer<T> upload(StreamId streamId, Class<T> type) {
        return this.upload(streamId, type, (ChannelTransformer<ByteBuf, ByteBuf>)ChannelTransformer.identity());
    }

    protected void serve(AsyncTcpSocket socket, InetAddress remoteAddress) {
        MessagingWithBinaryStreaming messaging = MessagingWithBinaryStreaming.create((AsyncTcpSocket)socket, this.codec);
        messaging.receive().whenResult(arg_0 -> this.lambda$serve$13((Messaging)messaging, arg_0)).whenException(arg_0 -> this.lambda$serve$14((Messaging)messaging, arg_0));
    }

    private void doRead(Messaging<DataflowCommand, DataflowResponse> messaging, DataflowCommand command) {
        BiConsumer<Messaging<DataflowCommand, DataflowResponse>, ?> handler = this.handlers.get(command.getClass());
        if (handler != null) {
            handler.accept(messaging, command);
            return;
        }
        this.logger.error("missing handler for {}", (Object)command);
        messaging.close();
    }

    protected void onClose(SettablePromise<Void> cb) {
        ArrayList<ChannelQueue<ByteBuf>> pending = new ArrayList<ChannelQueue<ByteBuf>>(this.pendingStreams.values());
        this.pendingStreams.clear();
        pending.forEach(AsyncCloseable::close);
        cb.set(null);
    }

    public Map<Long, Task> getLastTasks() {
        return this.lastTasks;
    }

    @JmxAttribute
    public int getRunningTasks() {
        return this.runningTasks.size();
    }

    @JmxAttribute
    public int getSucceededTasks() {
        return this.succeededTasks;
    }

    @JmxAttribute
    public int getFailedTasks() {
        return this.failedTasks;
    }

    @JmxAttribute
    public int getCanceledTasks() {
        return this.canceledTasks;
    }

    @JmxOperation
    public void cancelAll() {
        this.runningTasks.values().forEach(Task::cancel);
    }

    @JmxOperation
    public boolean cancel(long taskID) {
        Task task = this.runningTasks.get(taskID);
        if (task != null) {
            task.cancel();
            return true;
        }
        return false;
    }

    @JmxOperation
    public void cancelTask(long id) {
        Task task = this.runningTasks.get(id);
        if (task != null) {
            task.cancel();
        }
    }

    private /* synthetic */ void lambda$serve$14(Messaging messaging, Throwable e) {
        this.logger.error("received error while trying to read", e);
        messaging.close();
    }

    private /* synthetic */ void lambda$serve$13(Messaging messaging, DataflowCommand msg) {
        if (msg != null) {
            this.doRead((Messaging<DataflowCommand, DataflowResponse>)messaging, msg);
        } else {
            this.logger.warn("unexpected end of stream");
            messaging.close();
        }
    }
}

