/*
 * Decompiled with CFR 0.152.
 */
package cz.seznam.euphoria.core.client.flow;

import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.Datasets;
import cz.seznam.euphoria.core.client.flow.Util;
import cz.seznam.euphoria.core.client.functional.ExtractEventTime;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.AssignEventTime;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.util.Settings;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Flow
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(Flow.class);
    private final Settings settings;
    private final String name;
    private final Map<Operator<?, ?>, String> operatorNames = new HashMap();
    private final List<Operator<?, ?>> operators = new ArrayList();
    private final Set<Dataset<?>> outputs = new HashSet();
    private final Set<Dataset<?>> sources = new HashSet();
    private final Map<Dataset<?>, Set<Operator<?, ?>>> datasetConsumers = new HashMap();

    protected Flow(@Nullable String name, Settings settings) {
        this.name = name == null ? "" : name;
        this.settings = this.cloneSettings(settings);
    }

    public static Flow create() {
        return Flow.create(null);
    }

    public static Flow create(@Nullable String flowName) {
        return new Flow(flowName, new Settings());
    }

    public static Flow create(String flowName, Settings settings) {
        return new Flow(flowName, settings);
    }

    public <IN, OUT, T extends Operator<IN, OUT>> T add(T operator) {
        return this.add(operator, null);
    }

    public <T> void onPersisted(Dataset<T> dataset) {
    }

    <IN, OUT, T extends Operator<IN, OUT>> T add(T operator, @Nullable String logicalName) {
        this.operatorNames.put(operator, this.buildOperatorName(operator, logicalName));
        this.operators.add(operator);
        this.outputs.add(operator.output());
        this.validateSerializable(operator);
        for (Dataset<IN> d : operator.listInputs()) {
            if (!this.sources.contains(d) && !this.outputs.contains(d)) {
                throw new IllegalArgumentException("Invalid input: All dependencies must already be present in the flow!");
            }
            Set<Operator<?, ?>> consumers = this.datasetConsumers.get(d);
            if (consumers == null) {
                consumers = new HashSet();
                this.datasetConsumers.put(d, consumers);
            }
            consumers.add(operator);
        }
        return operator;
    }

    private void validateSerializable(Operator o) {
        try {
            CountingOutputStream outCounter = new CountingOutputStream();
            try (ObjectOutputStream out = new ObjectOutputStream(outCounter);){
                out.writeObject(o);
            }
            LOG.debug("Serialized operator {} ({}) into {} bytes", o.toString(), o.getClass(), outCounter.count);
        }
        catch (IOException e) {
            throw new IllegalStateException("Operator " + o + " not serializable!", e);
        }
    }

    private String buildOperatorName(Operator op, @Nullable String logicalName) {
        StringBuilder sb = new StringBuilder(64);
        sb.append(op.getName()).append('@').append(this.operatorNames.size() + 1);
        logicalName = Util.trimToNull(logicalName);
        if (logicalName != null) {
            sb.append('#').append(logicalName);
        }
        return sb.toString();
    }

    String getOperatorName(Operator what) {
        return this.operatorNames.get(what);
    }

    public Collection<Operator<?, ?>> operators() {
        return Collections.unmodifiableList(this.operators);
    }

    public Collection<Dataset<?>> sources() {
        return this.sources;
    }

    public int size() {
        return this.operators.size();
    }

    public Collection<Operator<?, ?>> getConsumersOf(Dataset<?> dataset) {
        Set<Operator<?, ?>> consumers = this.datasetConsumers.get(dataset);
        if (consumers != null) {
            return consumers;
        }
        return new ArrayList();
    }

    public String getName() {
        return this.name;
    }

    public String toString() {
        return "Flow{name='" + this.name + '\'' + ", size=" + this.size() + '}';
    }

    public Settings getSettings() {
        return this.settings;
    }

    public <T> Dataset<T> createInput(DataSource<T> source) {
        Dataset<T> ret = Datasets.createInputFromSource(this, source);
        this.sources.add(ret);
        return ret;
    }

    public <T> Dataset<T> createInput(DataSource<T> source, ExtractEventTime<T> evtTimeFn) {
        Dataset<T> input = this.createInput(source);
        return AssignEventTime.of(input).using(Objects.requireNonNull(evtTimeFn)).output();
    }

    private Settings cloneSettings(Settings settings) {
        return new Settings(settings);
    }

    static class CountingOutputStream
    extends OutputStream {
        long count;

        CountingOutputStream() {
        }

        @Override
        public void write(int b) throws IOException {
            ++this.count;
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            this.count += (long)len;
        }
    }
}

