/*
 * Decompiled with CFR 0.152.
 */
package io.activej.dataflow.http;

import io.activej.csp.binary.ByteBufsCodec;
import io.activej.csp.net.Messaging;
import io.activej.csp.net.MessagingWithBinaryStreaming;
import io.activej.dataflow.DataflowException;
import io.activej.dataflow.command.DataflowCommand;
import io.activej.dataflow.command.DataflowCommandGetTasks;
import io.activej.dataflow.command.DataflowResponse;
import io.activej.dataflow.command.DataflowResponsePartitionData;
import io.activej.dataflow.command.DataflowResponseResult;
import io.activej.dataflow.command.DataflowResponseTaskData;
import io.activej.dataflow.graph.Partition;
import io.activej.dataflow.graph.TaskStatus;
import io.activej.dataflow.http.LocalTaskData;
import io.activej.dataflow.http.ReducedTaskData;
import io.activej.dataflow.json.JsonCodec;
import io.activej.dataflow.json.JsonUtils;
import io.activej.dataflow.stats.NodeStat;
import io.activej.dataflow.stats.StatReducer;
import io.activej.http.AsyncServlet;
import io.activej.http.HttpError;
import io.activej.http.HttpMethod;
import io.activej.http.HttpRequest;
import io.activej.http.HttpResponse;
import io.activej.http.RoutingServlet;
import io.activej.http.StaticServlet;
import io.activej.inject.Key;
import io.activej.inject.ResourceLocator;
import io.activej.net.socket.tcp.AsyncTcpSocket;
import io.activej.net.socket.tcp.AsyncTcpSocketNio;
import io.activej.promise.Promisable;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.types.Types;
import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public final class DataflowDebugServlet
implements AsyncServlet {
    private final AsyncServlet servlet;
    private final ByteBufsCodec<DataflowResponse, DataflowCommand> codec;

    public DataflowDebugServlet(List<Partition> partitions, Executor executor, ByteBufsCodec<DataflowResponse, DataflowCommand> codec, ResourceLocator env, JsonCodec<Map<Long, List<@Nullable TaskStatus>>> taskListCodec) {
        this.codec = codec;
        JsonCodec reducedTaskDataCodec = (JsonCodec)env.getInstance(JsonUtils.codec(ReducedTaskData.class));
        JsonCodec localTaskDataCodec = (JsonCodec)env.getInstance(JsonUtils.codec(LocalTaskData.class));
        this.servlet = RoutingServlet.create().map("/*", (AsyncServlet)StaticServlet.ofClassPath((Executor)executor, (String)"debug").withIndexHtml()).map("/api/*", (AsyncServlet)RoutingServlet.create().map(HttpMethod.GET, "/partitions", request -> HttpResponse.ok200().withJson(partitions.stream().map(p -> "\"" + p.getAddress().getAddress().getHostAddress() + ":" + p.getAddress().getPort() + "\"").collect(Collectors.joining(",", "[", "]")))).map(HttpMethod.GET, "/tasks", request -> Promises.toList(partitions.stream().map(p -> this.getPartitionData(p.getAddress()))).map(partitionStats -> {
            HashMap<Long, @Nullable List> tasks = new HashMap<Long, List>();
            for (int i = 0; i < partitionStats.size(); ++i) {
                DataflowResponsePartitionData partitionStat = (DataflowResponsePartitionData)partitionStats.get(i);
                for (DataflowResponsePartitionData.TaskDesc taskDesc : partitionStat.getLast()) {
                    tasks.computeIfAbsent(taskDesc.getId(), $ -> Arrays.asList(new TaskStatus[partitionStats.size()])).set(i, taskDesc.getStatus());
                }
            }
            return HttpResponse.ok200().withJson(JsonUtils.toJson(taskListCodec, tasks));
        })).map(HttpMethod.GET, "/tasks/:taskID", request -> {
            long id = DataflowDebugServlet.getTaskId(request);
            return Promises.toList(partitions.stream().map(p -> this.getTask(p.getAddress(), id)).collect(Collectors.toList())).map(localStats -> {
                List<@Nullable TaskStatus> statuses = Arrays.asList(new TaskStatus[localStats.size()]);
                HashMap<K, @Nullable V> nodeStats = new HashMap();
                for (int i = 0; i < localStats.size(); ++i) {
                    DataflowResponseTaskData localTaskData = (DataflowResponseTaskData)localStats.get(i);
                    if (localTaskData == null) continue;
                    statuses.set(i, localTaskData.getStatus());
                    int finalI = i;
                    localTaskData.getNodes().forEach((index, nodeStat) -> nodeStats.computeIfAbsent(index, $ -> Arrays.asList(new NodeStat[localStats.size()])).set(finalI, nodeStat));
                }
                @Nullable Map reduced = nodeStats.entrySet().stream().collect(HashMap::new, (m, e) -> {
                    @Nullable NodeStat r = DataflowDebugServlet.reduce((List)e.getValue(), env);
                    if (r != null) {
                        m.put((Integer)e.getKey(), r);
                    }
                }, HashMap::putAll);
                ReducedTaskData taskData = new ReducedTaskData(statuses, ((DataflowResponseTaskData)localStats.get(0)).getGraphViz(), reduced);
                return HttpResponse.ok200().withJson(JsonUtils.toJson(reducedTaskDataCodec, taskData));
            });
        }).map(HttpMethod.GET, "/tasks/:taskID/:index", request -> {
            Partition partition;
            long id = DataflowDebugServlet.getTaskId(request);
            String indexParam = request.getPathParameter("index");
            try {
                partition = (Partition)partitions.get(Integer.parseInt(indexParam));
            }
            catch (IndexOutOfBoundsException | NumberFormatException e) {
                throw HttpError.ofCode((int)400, (String)"Bad index");
            }
            return this.getTask(partition.getAddress(), id).map(task -> HttpResponse.ok200().withJson(JsonUtils.toJson(localTaskDataCodec, new LocalTaskData(task.getStatus(), task.getGraphViz(), task.getNodes(), task.getStartTime(), task.getFinishTime(), task.getErrorString()))));
        }));
    }

    @Nullable
    private static NodeStat reduce(List<NodeStat> stats, ResourceLocator env) {
        Optional<NodeStat> firstNonNull = stats.stream().filter(Objects::nonNull).findAny();
        if (!firstNonNull.isPresent()) {
            return null;
        }
        StatReducer reducer = (StatReducer)env.getInstanceOrNull(Key.ofType((Type)Types.parameterizedType(StatReducer.class, (Type[])new Type[]{firstNonNull.get().getClass()})));
        if (reducer == null) {
            return null;
        }
        return reducer.reduce(stats);
    }

    private static long getTaskId(HttpRequest request) throws HttpError {
        String param = request.getPathParameter("taskID");
        try {
            return Long.parseLong(param);
        }
        catch (NumberFormatException e) {
            throw HttpError.ofCode((int)400, (String)("Bad number " + param));
        }
    }

    private Promise<DataflowResponsePartitionData> getPartitionData(InetSocketAddress address) {
        return AsyncTcpSocketNio.connect((InetSocketAddress)address).then(socket -> {
            MessagingWithBinaryStreaming messaging = MessagingWithBinaryStreaming.create((AsyncTcpSocket)socket, this.codec);
            return messaging.send((Object)new DataflowCommandGetTasks(null)).then(arg_0 -> DataflowDebugServlet.lambda$getPartitionData$14((Messaging)messaging, arg_0)).map(arg_0 -> DataflowDebugServlet.lambda$getPartitionData$15((Messaging)messaging, address, arg_0));
        });
    }

    private Promise<DataflowResponseTaskData> getTask(InetSocketAddress address, long taskId) {
        return AsyncTcpSocketNio.connect((InetSocketAddress)address).then(socket -> {
            MessagingWithBinaryStreaming messaging = MessagingWithBinaryStreaming.create((AsyncTcpSocket)socket, this.codec);
            return messaging.send((Object)new DataflowCommandGetTasks(taskId)).then(arg_0 -> DataflowDebugServlet.lambda$getTask$17((Messaging)messaging, arg_0)).map(arg_0 -> DataflowDebugServlet.lambda$getTask$18((Messaging)messaging, address, arg_0));
        });
    }

    @NotNull
    public Promisable<HttpResponse> serve(@NotNull HttpRequest request) throws Exception {
        return this.servlet.serve(request);
    }

    private static /* synthetic */ DataflowResponseTaskData lambda$getTask$18(Messaging messaging, InetSocketAddress address, DataflowResponse response) throws Exception {
        messaging.close();
        if (response instanceof DataflowResponseTaskData) {
            return (DataflowResponseTaskData)response;
        }
        if (response instanceof DataflowResponseResult) {
            throw new DataflowException("Error on remote server " + address + ": " + ((DataflowResponseResult)response).getError());
        }
        throw new DataflowException("Bad response from server");
    }

    private static /* synthetic */ Promise lambda$getTask$17(Messaging messaging, Void $) throws Exception {
        return messaging.receive();
    }

    private static /* synthetic */ DataflowResponsePartitionData lambda$getPartitionData$15(Messaging messaging, InetSocketAddress address, DataflowResponse response) throws Exception {
        messaging.close();
        if (response instanceof DataflowResponsePartitionData) {
            return (DataflowResponsePartitionData)response;
        }
        if (response instanceof DataflowResponseResult) {
            throw new DataflowException("Error on remote server " + address + ": " + ((DataflowResponseResult)response).getError());
        }
        throw new DataflowException("Bad response from server");
    }

    private static /* synthetic */ Promise lambda$getPartitionData$14(Messaging messaging, Void $) throws Exception {
        return messaging.receive();
    }
}

