/*
 * Decompiled with CFR 0.152.
 */
package io.activej.dataflow.graph;

import io.activej.async.process.AsyncCloseable;
import io.activej.common.Checks;
import io.activej.dataflow.graph.StreamId;
import io.activej.dataflow.graph.TaskStatus;
import io.activej.dataflow.inject.DatasetIdModule;
import io.activej.dataflow.node.Node;
import io.activej.dataflow.node.NodeDownload;
import io.activej.dataflow.node.NodeUpload;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamSupplier;
import io.activej.inject.Key;
import io.activej.inject.ResourceLocator;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.SettablePromise;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jetbrains.annotations.Nullable;

public final class Task {
    private final Map<StreamId, StreamSupplier<?>> suppliers = new LinkedHashMap();
    private final Map<StreamId, StreamConsumer<?>> consumers = new LinkedHashMap();
    private final SettablePromise<Void> executionPromise = new SettablePromise();
    private final long taskId;
    private final ResourceLocator environment;
    private final DatasetIdModule.DatasetIds datasetIds;
    private final List<Node> nodes;
    private final AtomicBoolean bound = new AtomicBoolean();
    private TaskStatus status = TaskStatus.RUNNING;
    @Nullable
    private Instant started;
    @Nullable
    private Instant finished;
    @Nullable
    private Throwable error;
    @Nullable
    private List<Promise<Void>> currentNodeAcks;
    private static final Function<Node, String> NODE_ID_FUNCTION = n -> "n" + n.getIndex();

    public Task(long taskId, ResourceLocator environment, List<Node> nodes) {
        this.taskId = taskId;
        this.environment = environment;
        this.nodes = nodes;
        this.datasetIds = (DatasetIdModule.DatasetIds)environment.getInstance(DatasetIdModule.DatasetIds.class);
    }

    public void bind() {
        if (!this.bound.compareAndSet(false, true)) {
            throw new IllegalStateException("Task was already bound!");
        }
        for (Node node : this.nodes) {
            this.currentNodeAcks = new ArrayList<Promise<Void>>();
            node.createAndBind(this);
            Promises.all(this.currentNodeAcks).whenComplete(($, e) -> node.finish(e));
        }
        this.currentNodeAcks = null;
    }

    public Object get(String key) {
        return this.environment.getInstance(this.datasetIds.getKeyForId(key));
    }

    public <T> T get(Class<T> cls) {
        return (T)this.environment.getInstance(cls);
    }

    public <T> T get(Key<T> key) {
        return (T)this.environment.getInstance(key);
    }

    public <T> void bindChannel(StreamId streamId, StreamConsumer<T> consumer) {
        Checks.checkState((!this.consumers.containsKey(streamId) ? 1 : 0) != 0, (Object)"Already bound");
        Checks.checkState((this.currentNodeAcks != null ? 1 : 0) != 0, (Object)"Must bind streams only from createAndBind");
        this.consumers.put(streamId, consumer);
        this.currentNodeAcks.add((Promise<Void>)consumer.getAcknowledgement());
    }

    public <T> void export(StreamId streamId, StreamSupplier<T> supplier) {
        Checks.checkState((!this.suppliers.containsKey(streamId) ? 1 : 0) != 0, (Object)"Already exported");
        Checks.checkState((this.currentNodeAcks != null ? 1 : 0) != 0, (Object)"Must bind streams only from createAndBind");
        this.suppliers.put(streamId, supplier);
        this.currentNodeAcks.add((Promise<Void>)supplier.getAcknowledgement());
    }

    public Promise<Void> execute() {
        this.started = Instant.now();
        return Promises.all(this.suppliers.entrySet().stream().map(supplierEntry -> {
            StreamId streamId = (StreamId)supplierEntry.getKey();
            try {
                StreamSupplier supplier = (StreamSupplier)supplierEntry.getValue();
                StreamConsumer<?> consumer = this.consumers.get(streamId);
                Checks.checkNotNull((Object)supplier, (String)"Supplier not found for %s, consumer %s", (Object[])new Object[]{streamId, consumer});
                Checks.checkNotNull(consumer, (String)"Consumer not found for %s, supplier %s", (Object[])new Object[]{streamId, supplier});
                return supplier.streamTo(consumer);
            }
            catch (Exception e) {
                return Promise.ofException((Throwable)e);
            }
        }).collect(Collectors.toList())).whenComplete(($, e) -> {
            this.finished = Instant.now();
            this.error = e;
            this.status = e == null ? TaskStatus.COMPLETED : (e == AsyncCloseable.CLOSE_EXCEPTION ? TaskStatus.CANCELED : TaskStatus.FAILED);
            this.executionPromise.accept($, e);
        });
    }

    public void cancel() {
        this.suppliers.values().forEach(AsyncCloseable::close);
        this.consumers.values().forEach(AsyncCloseable::close);
    }

    public Promise<Void> getExecutionPromise() {
        return this.executionPromise;
    }

    public boolean isExecuted() {
        return this.executionPromise.isComplete();
    }

    public TaskStatus getStatus() {
        return this.status;
    }

    @Nullable
    public Instant getStartTime() {
        return this.started;
    }

    @Nullable
    public Instant getFinishTime() {
        return this.finished;
    }

    @Nullable
    public Throwable getError() {
        return this.error;
    }

    public List<Node> getNodes() {
        return this.nodes;
    }

    public long getTaskId() {
        return this.taskId;
    }

    @JmxAttribute
    public String getGraphViz() {
        HashMap nodesByInput = new HashMap();
        HashMap nodesByOutput = new HashMap();
        HashMap<StreamId, Node> uploads = new HashMap<StreamId, Node>();
        HashMap<StreamId, StreamId> network = new HashMap<StreamId, StreamId>();
        LinkedHashMap<Node, String> ids = new LinkedHashMap<Node, String>();
        for (Node node2 : this.nodes) {
            if (node2 instanceof NodeDownload) {
                NodeDownload download = (NodeDownload)node2;
                network.put(download.getStreamId(), download.getOutput());
                continue;
            }
            if (node2 instanceof NodeUpload) {
                uploads.put(((NodeUpload)node2).getStreamId(), node2);
                continue;
            }
            node2.getInputs().forEach(input -> nodesByInput.put(input, node2));
            node2.getOutputs().forEach(input -> nodesByOutput.put(input, node2));
        }
        StringBuilder sb = new StringBuilder("digraph {\n");
        for (Node node3 : this.nodes) {
            if (node3 instanceof NodeDownload) {
                Node target;
                StreamId input2 = ((NodeDownload)node3).getStreamId();
                if (nodesByOutput.containsKey(input2) || (target = (Node)nodesByInput.get(((NodeDownload)node3).getOutput())) == null) continue;
                String nodeId = "n" + node3.getIndex();
                sb.append("  ").append(nodeId).append(" [id=\"").append(nodeId).append("\", shape=point, xlabel=\"").append(input2.getId()).append("\"]\n");
                sb.append("  ").append(nodeId).append(" -> ").append(ids.computeIfAbsent(target, NODE_ID_FUNCTION)).append(" [style=dashed]\n");
                continue;
            }
            if (node3 instanceof NodeUpload) continue;
            String nodeId = ids.computeIfAbsent(node3, NODE_ID_FUNCTION);
            for (StreamId output : node3.getOutputs()) {
                Node target = (Node)nodesByInput.get(output);
                if (target != null) {
                    sb.append("  ").append(nodeId).append(" -> ").append(ids.computeIfAbsent(target, NODE_ID_FUNCTION)).append('\n');
                    continue;
                }
                Node netTarget = (Node)nodesByInput.get(network.get(output));
                if (netTarget != null) {
                    sb.append("  ").append(nodeId).append(" -> ").append(ids.computeIfAbsent(netTarget, NODE_ID_FUNCTION));
                } else {
                    Node test = (Node)uploads.get(output);
                    String outputId = test != null ? "n" + test.getIndex() : "s" + output.getId();
                    sb.append("  ").append(outputId).append(" [id=\"").append(outputId).append("\", shape=point, xlabel=\"").append(output.getId()).append("\"]\n");
                    sb.append("  ").append(nodeId).append(" -> ").append(outputId);
                }
                sb.append(" [style=dashed]\n");
            }
        }
        sb.append('\n');
        ids.forEach((node, id) -> {
            String name = node.getClass().getSimpleName();
            sb.append("  ").append((String)id).append(" [label=\"").append(name.startsWith("Node") ? name.substring(4) : name).append("\" id=").append((String)id);
            Throwable error = node.getError();
            if (error != null) {
                StringWriter str = new StringWriter();
                error.printStackTrace(new PrintWriter(str));
                sb.append(" color=red tooltip=\"").append(str.toString().replace("\"", "\\\"")).append("\"");
            } else if (node.getFinished() != null) {
                sb.append(" color=blue");
            }
            sb.append("]\n");
        });
        return sb.append('}').toString();
    }
}

