/*
 * Decompiled with CFR 0.152.
 */
package io.activej.dataflow.json;

import com.dslplatform.json.BoolConverter;
import com.dslplatform.json.JsonReader;
import com.dslplatform.json.JsonWriter;
import com.dslplatform.json.NumberConverter;
import com.dslplatform.json.ParsingException;
import com.dslplatform.json.StringConverter;
import io.activej.dataflow.command.DataflowCommand;
import io.activej.dataflow.command.DataflowCommandDownload;
import io.activej.dataflow.command.DataflowCommandExecute;
import io.activej.dataflow.command.DataflowCommandGetTasks;
import io.activej.dataflow.command.DataflowResponse;
import io.activej.dataflow.command.DataflowResponsePartitionData;
import io.activej.dataflow.command.DataflowResponseResult;
import io.activej.dataflow.command.DataflowResponseTaskData;
import io.activej.dataflow.graph.StreamId;
import io.activej.dataflow.graph.TaskStatus;
import io.activej.dataflow.http.LocalTaskData;
import io.activej.dataflow.http.ReducedTaskData;
import io.activej.dataflow.json.JsonCodec;
import io.activej.dataflow.json.JsonCodecSubtype;
import io.activej.dataflow.json.JsonUtils;
import io.activej.dataflow.node.Node;
import io.activej.dataflow.node.NodeConsumerOfId;
import io.activej.dataflow.node.NodeDownload;
import io.activej.dataflow.node.NodeFilter;
import io.activej.dataflow.node.NodeJoin;
import io.activej.dataflow.node.NodeMap;
import io.activej.dataflow.node.NodeMerge;
import io.activej.dataflow.node.NodeReduce;
import io.activej.dataflow.node.NodeReduceSimple;
import io.activej.dataflow.node.NodeShard;
import io.activej.dataflow.node.NodeSort;
import io.activej.dataflow.node.NodeSupplierOfId;
import io.activej.dataflow.node.NodeUnion;
import io.activej.dataflow.node.NodeUpload;
import io.activej.dataflow.stats.BinaryNodeStat;
import io.activej.dataflow.stats.NodeStat;
import io.activej.dataflow.stats.TestNodeStat;
import io.activej.datastream.processor.StreamJoin;
import io.activej.datastream.processor.StreamReducers;
import io.activej.inject.Injector;
import io.activej.inject.Key;
import io.activej.inject.annotation.Provides;
import io.activej.inject.annotation.QualifierAnnotation;
import io.activej.inject.binding.Binding;
import io.activej.inject.binding.OptionalDependency;
import io.activej.inject.module.AbstractModule;
import io.activej.inject.module.Module;
import io.activej.types.Types;
import java.io.IOException;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.reflect.Type;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.jetbrains.annotations.Nullable;

public final class JsonModule
extends AbstractModule {
    private static final Comparator<?> NATURAL_ORDER = Comparator.naturalOrder();
    private static final Class<?> NATURAL_ORDER_CLASS = NATURAL_ORDER.getClass();
    private static final String TASK_ID = "taskId";
    private static final String NODE_STATS = "nodeStats";
    private static final String ERROR = "error";
    private static final String INDEX = "index";
    private static final String TYPE = "type";
    private static final String STREAM_ID = "streamId";
    private static final String ADDRESS = "address";
    private static final String INPUT = "input";
    private static final String OUTPUT = "output";
    private static final String FUNCTION = "function";
    private static final String REDUCER = "reducer";
    private static final String REDUCER_TO_RESULT = "reducerToResult";
    private static final String PREDICATE = "predicate";
    private static final String OUTPUTS = "outputs";
    private static final String NONCE = "nonce";
    private static final String INPUTS = "inputs";
    private static final String KEY_FUNCTION = "keyFunction";
    private static final String KEY_COMPARATOR = "keyComparator";
    private static final String DEDUPLICATE = "deduplicate";
    private static final String ITEMS_IN_MEMORY_SIZE = "itemsInMemorySize";
    private static final String LEFT = "left";
    private static final String RIGHT = "right";
    private static final String LEFT_KEY_FUNCTION = "leftKeyFunction";
    private static final String RIGHT_KEY_FUNCTION = "rightKeyFunction";
    private static final String JOINER = "joiner";
    private static final String STATUS = "status";
    private static final String START = "start";
    private static final String FINISH = "finish";
    private static final String GRAPHVIZ = "graphviz";
    private static final String STARTED = "started";
    private static final String FINISHED = "finished";
    private static final String GRAPH = "graph";
    private static final String STATUSES = "statuses";

    private JsonModule() {
    }

    public static Module create() {
        return new JsonModule();
    }

    protected void configure() {
        this.bind(JsonUtils.codec(DataflowCommand.class));
        this.bind(JsonUtils.codec(DataflowResponse.class));
        this.bind(JsonUtils.codec(NodeConsumerOfId.class));
        this.bind(JsonUtils.codec(NodeSupplierOfId.class));
        this.bind(JsonUtils.codec(NodeUnion.class));
        this.bind(JsonUtils.codec(DataflowCommandDownload.class));
        this.bind(JsonUtils.codec(DataflowResponsePartitionData.class));
        this.bind(JsonUtils.codec(TestNodeStat.class));
        this.bind(JsonUtils.codec(BinaryNodeStat.class));
        this.bind(Key.ofType((Type)Types.parameterizedType(JsonCodec.class, (Type[])new Type[]{NATURAL_ORDER_CLASS}))).toInstance(JsonUtils.ofObject(() -> NATURAL_ORDER));
        this.generate(JsonCodec.class, (bindings, scope, key) -> {
            Class type = key.getTypeParameter(0).getRawType();
            if (type.isEnum()) {
                return Binding.to(() -> JsonCodec.of(reader -> {
                    if (reader.wasNull()) {
                        return null;
                    }
                    return ElementType.valueOf(type, reader.readString());
                }, (writer, value) -> {
                    if (value == null) {
                        writer.writeNull();
                    } else {
                        writer.writeString(value.name());
                    }
                }));
            }
            if (type == Map.class || type == List.class) {
                return null;
            }
            if (key.getQualifier() != Subtypes.class && !type.isAnnotationPresent(Subtypes.class)) {
                return Binding.to(() -> {
                    JsonWriter.WriteObject writer = JsonUtils.DSL_JSON.tryFindWriter(type);
                    if (writer == null) {
                        throw new IllegalStateException("Cannot find serializer for " + type);
                    }
                    JsonReader.ReadObject reader = JsonUtils.DSL_JSON.tryFindReader(type);
                    if (writer == null) {
                        throw new IllegalStateException("Cannot find deserializer for " + type);
                    }
                    return JsonCodec.of(reader, writer);
                });
            }
            return Binding.to(args -> {
                Injector injector = (Injector)args[0];
                SubtypeNameFactory names = (SubtypeNameFactory)((OptionalDependency)args[1]).orElse($ -> null);
                HashSet<Class> subtypes = new HashSet<Class>();
                for (Injector i = injector; i != null; i = i.getParent()) {
                    for (Key k : i.getBindings().keySet()) {
                        Class subtype;
                        if (k.getRawType() != JsonCodec.class || type == (subtype = k.getTypeParameter(0).getRawType()) || !type.isAssignableFrom(subtype)) continue;
                        subtypes.add(subtype);
                    }
                }
                JsonCodecSubtype combined = JsonCodecSubtype.create();
                for (Class subtype : subtypes) {
                    JsonCodec codec = (JsonCodec)injector.getInstance(Key.ofType((Type)Types.parameterizedType(JsonCodec.class, (Type[])new Type[]{subtype})));
                    String name = names.getName(subtype);
                    if (name != null) {
                        combined.setSubtypeCodec(subtype, name, codec);
                        continue;
                    }
                    combined.setSubtypeCodec(subtype, codec);
                }
                return combined;
            }, (Key[])new Key[]{Key.of(Injector.class), new Key<OptionalDependency<SubtypeNameFactory>>(){}});
        });
    }

    @Provides
    JsonCodec<Class<?>> classCodec() {
        return JsonCodec.of(reader -> {
            if (reader.wasNull()) {
                return null;
            }
            try {
                return Class.forName(reader.readString());
            }
            catch (ClassNotFoundException e) {
                throw ParsingException.create((String)"No such class", (Throwable)e, (boolean)true);
            }
        }, (writer, value) -> {
            if (value == null) {
                writer.writeNull();
            } else {
                writer.writeString(value.getName());
            }
        });
    }

    @Provides
    JsonCodec<StreamId> streamId() {
        return JsonCodec.of(reader -> new StreamId(NumberConverter.deserializeLong((JsonReader)reader)), (writer, value) -> {
            if (value == null) {
                writer.writeNull();
            } else {
                NumberConverter.serialize((long)value.getId(), (JsonWriter)writer);
            }
        });
    }

    @Provides
    JsonCodec<InetSocketAddress> address() {
        return JsonCodec.of(reader -> {
            String str = reader.readString();
            String[] split = str.split(":");
            if (split.length != 2) {
                throw ParsingException.create((String)"Address should be split with a single ':'", (boolean)true);
            }
            try {
                return new InetSocketAddress(InetAddress.getByName(split[0]), Integer.parseInt(split[1]));
            }
            catch (UnknownHostException e) {
                throw ParsingException.create((String)"Failed to create InetSocketAddress", (Throwable)e, (boolean)true);
            }
        }, (writer, addr) -> writer.writeString(addr.getAddress().getHostAddress() + ':' + addr.getPort()));
    }

    @Provides
    JsonCodec<DataflowCommandExecute> dataflowCommandExecute(JsonCodec<List<Node>> nodesCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            long taskId = (Long)JsonModule.readValue(reader, TASK_ID, NumberConverter.LONG_READER);
            reader.comma();
            List nodes = (List)JsonModule.readValue(reader, NODE_STATS, nodesCodec);
            reader.endObject();
            return new DataflowCommandExecute(taskId, nodes);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, TASK_ID, NumberConverter.LONG_WRITER, value.getTaskId());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, NODE_STATS, nodesCodec, value.getNodes());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<DataflowCommandGetTasks> dataflowCommandGetTasks() {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            Long taskId = (Long)JsonModule.readValue(reader, TASK_ID, NumberConverter.LONG_READER);
            reader.endObject();
            return new DataflowCommandGetTasks(taskId);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, TASK_ID, NumberConverter.LONG_WRITER, value.getTaskId());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<DataflowResponseResult> dataflowResponseResult() {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            String error = (String)JsonModule.readValue(reader, ERROR, StringConverter.READER);
            reader.endObject();
            return new DataflowResponseResult(error);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, ERROR, StringConverter.WRITER, value.getError());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<NodeUpload> nodeUpload(JsonCodec<Class<?>> classCodec, JsonCodec<StreamId> streamIdCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            int index = (Integer)JsonModule.readValue(reader, INDEX, NumberConverter.INT_READER);
            reader.comma();
            Class type = (Class)JsonModule.readValue(reader, TYPE, classCodec);
            reader.comma();
            StreamId streamId = (StreamId)JsonModule.readValue(reader, STREAM_ID, streamIdCodec);
            reader.endObject();
            return new NodeUpload(index, type, streamId);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, INDEX, NumberConverter.INT_WRITER, value.getIndex());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, TYPE, classCodec, value.getType());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, STREAM_ID, streamIdCodec, value.getStreamId());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<NodeDownload> nodeDownload(JsonCodec<Class<?>> classCodec, JsonCodec<InetSocketAddress> addressCodec, JsonCodec<StreamId> streamIdCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            int index = (Integer)JsonModule.readValue(reader, INDEX, NumberConverter.INT_READER);
            reader.comma();
            Class type = (Class)JsonModule.readValue(reader, TYPE, classCodec);
            reader.comma();
            InetSocketAddress address = (InetSocketAddress)JsonModule.readValue(reader, ADDRESS, addressCodec);
            reader.comma();
            StreamId streamId = (StreamId)JsonModule.readValue(reader, STREAM_ID, streamIdCodec);
            reader.comma();
            StreamId output = (StreamId)JsonModule.readValue(reader, OUTPUT, streamIdCodec);
            reader.endObject();
            return new NodeDownload(index, type, address, streamId, output);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, INDEX, NumberConverter.INT_WRITER, value.getIndex());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, TYPE, classCodec, value.getType());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, ADDRESS, addressCodec, value.getAddress());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, STREAM_ID, streamIdCodec, value.getStreamId());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, OUTPUT, streamIdCodec, value.getOutput());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<NodeMap> nodeMap(@Subtypes JsonCodec<Function> functionCodec, JsonCodec<StreamId> streamIdCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            int index = (Integer)JsonModule.readValue(reader, INDEX, NumberConverter::deserializeInt);
            reader.comma();
            Function function = (Function)JsonModule.readValue(reader, FUNCTION, functionCodec);
            reader.comma();
            StreamId input = (StreamId)JsonModule.readValue(reader, INPUT, streamIdCodec);
            reader.comma();
            StreamId output = (StreamId)JsonModule.readValue(reader, OUTPUT, streamIdCodec);
            reader.endObject();
            return new NodeMap(index, function, input, output);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, INDEX, NumberConverter.INT_WRITER, value.getIndex());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, FUNCTION, functionCodec, value.getFunction());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, INPUT, streamIdCodec, value.getInput());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, OUTPUT, streamIdCodec, value.getOutput());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<NodeReduce.Input> nodeReduceInput(@Subtypes JsonCodec<StreamReducers.Reducer> reducerCodec, @Subtypes JsonCodec<Function> functionCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            StreamReducers.Reducer reducer = (StreamReducers.Reducer)JsonModule.readValue(reader, REDUCER, reducerCodec);
            reader.comma();
            Function function = (Function)JsonModule.readValue(reader, KEY_FUNCTION, functionCodec);
            reader.endObject();
            return new NodeReduce.Input(reducer, function);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, REDUCER, reducerCodec, value.getReducer());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, KEY_FUNCTION, functionCodec, value.getKeyFunction());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<StreamReducers.ReducerToResult.InputToAccumulator> inputToAccumulator(@Subtypes JsonCodec<StreamReducers.ReducerToResult> reducerToResultCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            StreamReducers.ReducerToResult reducerToResult = (StreamReducers.ReducerToResult)JsonModule.readValue(reader, REDUCER_TO_RESULT, reducerToResultCodec);
            reader.endObject();
            return new StreamReducers.ReducerToResult.InputToAccumulator(reducerToResult);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, REDUCER_TO_RESULT, reducerToResultCodec, value.getReducerToResult());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<StreamReducers.ReducerToResult.InputToOutput> inputToOutput(@Subtypes JsonCodec<StreamReducers.ReducerToResult> reducerToResultCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            StreamReducers.ReducerToResult reducerToResult = (StreamReducers.ReducerToResult)JsonModule.readValue(reader, REDUCER_TO_RESULT, reducerToResultCodec);
            reader.endObject();
            return new StreamReducers.ReducerToResult.InputToOutput(reducerToResult);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, REDUCER_TO_RESULT, reducerToResultCodec, value.getReducerToResult());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<StreamReducers.ReducerToResult.AccumulatorToAccumulator> accumulatorToAccumulator(@Subtypes JsonCodec<StreamReducers.ReducerToResult> reducerToResultCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            StreamReducers.ReducerToResult reducerToResult = (StreamReducers.ReducerToResult)JsonModule.readValue(reader, REDUCER_TO_RESULT, reducerToResultCodec);
            reader.endObject();
            return new StreamReducers.ReducerToResult.AccumulatorToAccumulator(reducerToResult);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, REDUCER_TO_RESULT, reducerToResultCodec, value.getReducerToResult());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<StreamReducers.ReducerToResult.AccumulatorToOutput> accumulatorToOutput(@Subtypes JsonCodec<StreamReducers.ReducerToResult> reducerToResultCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            StreamReducers.ReducerToResult reducerToResult = (StreamReducers.ReducerToResult)JsonModule.readValue(reader, REDUCER_TO_RESULT, reducerToResultCodec);
            reader.endObject();
            return new StreamReducers.ReducerToResult.AccumulatorToOutput(reducerToResult);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, REDUCER_TO_RESULT, reducerToResultCodec, value.getReducerToResult());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<StreamReducers.DeduplicateReducer> mergeDistinctReducer() {
        return JsonUtils.ofObject(StreamReducers.DeduplicateReducer::new);
    }

    @Provides
    JsonCodec<StreamReducers.MergeReducer> mergeSortReducer() {
        return JsonUtils.ofObject(StreamReducers.MergeReducer::new);
    }

    @Provides
    JsonCodec<NodeFilter> nodeFilter(@Subtypes JsonCodec<Predicate> predicateCodec, JsonCodec<StreamId> streamIdCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            int index = (Integer)JsonModule.readValue(reader, INDEX, NumberConverter::deserializeInt);
            reader.comma();
            Predicate predicate = (Predicate)JsonModule.readValue(reader, PREDICATE, predicateCodec);
            reader.comma();
            StreamId input = (StreamId)JsonModule.readValue(reader, INPUT, streamIdCodec);
            reader.comma();
            StreamId output = (StreamId)JsonModule.readValue(reader, OUTPUT, streamIdCodec);
            reader.endObject();
            return new NodeFilter(index, predicate, input, output);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, INDEX, NumberConverter.INT_WRITER, value.getIndex());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, PREDICATE, predicateCodec, value.getPredicate());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, INPUT, streamIdCodec, value.getInput());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, OUTPUT, streamIdCodec, value.getOutput());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<NodeShard> nodeShard(@Subtypes JsonCodec<Function> functionCodec, JsonCodec<StreamId> streamIdCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            int index = (Integer)JsonModule.readValue(reader, INDEX, NumberConverter::deserializeInt);
            reader.comma();
            Function function = (Function)JsonModule.readValue(reader, KEY_FUNCTION, functionCodec);
            reader.comma();
            StreamId input = (StreamId)JsonModule.readValue(reader, INPUT, streamIdCodec);
            reader.comma();
            List outputs = (List)JsonModule.readValue(reader, OUTPUTS, $ -> reader.readCollection((JsonReader.ReadObject)streamIdCodec));
            reader.comma();
            int nonce = (Integer)JsonModule.readValue(reader, NONCE, NumberConverter.INT_READER);
            reader.endObject();
            return new NodeShard(index, function, input, outputs, nonce);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, INDEX, NumberConverter.INT_WRITER, value.getIndex());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, KEY_FUNCTION, functionCodec, value.getKeyFunction());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, INPUT, streamIdCodec, value.getInput());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, OUTPUTS, ($, outputs) -> writer.serialize(outputs, (JsonWriter.WriteObject)streamIdCodec), value.getOutputs());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, NONCE, NumberConverter.INT_WRITER, value.getNonce());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<NodeMerge> nodeMerge(@Subtypes JsonCodec<Function> functionCodec, @Subtypes JsonCodec<Comparator> comparatorCodec, JsonCodec<StreamId> streamIdCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            int index = (Integer)JsonModule.readValue(reader, INDEX, NumberConverter::deserializeInt);
            reader.comma();
            Function function = (Function)JsonModule.readValue(reader, KEY_FUNCTION, functionCodec);
            reader.comma();
            Comparator comparator = (Comparator)JsonModule.readValue(reader, KEY_COMPARATOR, comparatorCodec);
            reader.comma();
            boolean deduplicate = (Boolean)JsonModule.readValue(reader, DEDUPLICATE, BoolConverter.READER);
            reader.comma();
            List inputs = (List)JsonModule.readValue(reader, INPUTS, $ -> reader.readCollection((JsonReader.ReadObject)streamIdCodec));
            reader.comma();
            StreamId output = (StreamId)JsonModule.readValue(reader, OUTPUT, streamIdCodec);
            reader.endObject();
            return new NodeMerge(index, function, comparator, deduplicate, inputs, output);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, INDEX, NumberConverter.INT_WRITER, value.getIndex());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, KEY_FUNCTION, functionCodec, value.getKeyFunction());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, KEY_COMPARATOR, comparatorCodec, value.getKeyComparator());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, DEDUPLICATE, BoolConverter.WRITER, value.isDeduplicate());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, INPUTS, ($, inputs) -> writer.serialize(inputs, (JsonWriter.WriteObject)streamIdCodec), value.getInputs());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, OUTPUT, streamIdCodec, value.getOutput());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<NodeReduce> nodeReduce(@Subtypes JsonCodec<Comparator> comparatorCodec, JsonCodec<StreamId> streamIdCodec, JsonCodec<Map<StreamId, NodeReduce.Input>> inputsCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            int index = (Integer)JsonModule.readValue(reader, INDEX, NumberConverter::deserializeInt);
            reader.comma();
            Comparator comparator = (Comparator)JsonModule.readValue(reader, KEY_COMPARATOR, comparatorCodec);
            reader.comma();
            Map inputs = (Map)JsonModule.readValue(reader, INPUTS, inputsCodec);
            reader.comma();
            StreamId output = (StreamId)JsonModule.readValue(reader, OUTPUT, streamIdCodec);
            reader.endObject();
            return new NodeReduce(index, comparator, inputs, output);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, INDEX, NumberConverter.INT_WRITER, value.getIndex());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, KEY_COMPARATOR, comparatorCodec, value.getKeyComparator());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, INPUTS, inputsCodec, value.getInputMap());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, OUTPUT, streamIdCodec, value.getOutput());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<NodeReduceSimple> nodeReduceSimple(@Subtypes JsonCodec<Function> functionCodec, @Subtypes JsonCodec<Comparator> comparatorCodec, @Subtypes JsonCodec<StreamReducers.Reducer> reducerCodec, JsonCodec<StreamId> streamIdCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            int index = (Integer)JsonModule.readValue(reader, INDEX, NumberConverter::deserializeInt);
            reader.comma();
            Function function = (Function)JsonModule.readValue(reader, KEY_FUNCTION, functionCodec);
            reader.comma();
            Comparator comparator = (Comparator)JsonModule.readValue(reader, KEY_COMPARATOR, comparatorCodec);
            reader.comma();
            StreamReducers.Reducer reducer = (StreamReducers.Reducer)JsonModule.readValue(reader, REDUCER, reducerCodec);
            reader.comma();
            List inputs = (List)JsonModule.readValue(reader, INPUTS, $ -> reader.readCollection((JsonReader.ReadObject)streamIdCodec));
            reader.comma();
            StreamId output = (StreamId)JsonModule.readValue(reader, OUTPUT, streamIdCodec);
            reader.endObject();
            return new NodeReduceSimple(index, function, comparator, reducer, inputs, output);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, INDEX, NumberConverter.INT_WRITER, value.getIndex());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, KEY_FUNCTION, functionCodec, value.getKeyFunction());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, KEY_COMPARATOR, comparatorCodec, value.getKeyComparator());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, REDUCER, reducerCodec, value.getReducer());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, INPUTS, ($, inputs) -> writer.serialize(inputs, (JsonWriter.WriteObject)streamIdCodec), value.getInputs());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, OUTPUT, streamIdCodec, value.getOutput());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<NodeSort> nodeSort(JsonCodec<Class<?>> typeCodec, @Subtypes JsonCodec<Comparator> comparatorCodec, @Subtypes JsonCodec<Function> functionCodec, JsonCodec<StreamId> streamIdCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            int index = (Integer)JsonModule.readValue(reader, INDEX, NumberConverter::deserializeInt);
            reader.comma();
            Class type = (Class)JsonModule.readValue(reader, TYPE, typeCodec);
            reader.comma();
            Function function = (Function)JsonModule.readValue(reader, KEY_FUNCTION, functionCodec);
            reader.comma();
            Comparator comparator = (Comparator)JsonModule.readValue(reader, KEY_COMPARATOR, comparatorCodec);
            reader.comma();
            boolean deduplicate = (Boolean)JsonModule.readValue(reader, DEDUPLICATE, BoolConverter.READER);
            reader.comma();
            int itemsInMemorySize = (Integer)JsonModule.readValue(reader, ITEMS_IN_MEMORY_SIZE, NumberConverter.INT_READER);
            reader.comma();
            StreamId input = (StreamId)JsonModule.readValue(reader, INPUT, streamIdCodec);
            reader.comma();
            StreamId output = (StreamId)JsonModule.readValue(reader, OUTPUT, streamIdCodec);
            reader.endObject();
            return new NodeSort(index, type, function, comparator, deduplicate, itemsInMemorySize, input, output);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, INDEX, NumberConverter.INT_WRITER, value.getIndex());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, TYPE, typeCodec, value.getType());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, KEY_FUNCTION, functionCodec, value.getKeyFunction());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, KEY_COMPARATOR, comparatorCodec, value.getKeyComparator());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, DEDUPLICATE, BoolConverter.WRITER, value.isDeduplicate());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, ITEMS_IN_MEMORY_SIZE, NumberConverter.INT_WRITER, value.getItemsInMemorySize());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, INPUT, streamIdCodec, value.getInput());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, OUTPUT, streamIdCodec, value.getOutput());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<NodeJoin> nodeJoin(@Subtypes JsonCodec<StreamJoin.Joiner> joinerCodec, @Subtypes JsonCodec<Comparator> comparatorCodec, @Subtypes JsonCodec<Function> functionCodec, JsonCodec<StreamId> streamIdCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            int index = (Integer)JsonModule.readValue(reader, INDEX, NumberConverter::deserializeInt);
            reader.comma();
            StreamId left = (StreamId)JsonModule.readValue(reader, LEFT, streamIdCodec);
            reader.comma();
            StreamId right = (StreamId)JsonModule.readValue(reader, RIGHT, streamIdCodec);
            reader.comma();
            StreamId output = (StreamId)JsonModule.readValue(reader, OUTPUT, streamIdCodec);
            reader.comma();
            Comparator comparator = (Comparator)JsonModule.readValue(reader, KEY_COMPARATOR, comparatorCodec);
            reader.comma();
            Function leftKeyFunction = (Function)JsonModule.readValue(reader, LEFT_KEY_FUNCTION, functionCodec);
            reader.comma();
            Function rightKeyFunction = (Function)JsonModule.readValue(reader, RIGHT_KEY_FUNCTION, functionCodec);
            reader.comma();
            StreamJoin.Joiner joiner = (StreamJoin.Joiner)JsonModule.readValue(reader, JOINER, joinerCodec);
            reader.endObject();
            return new NodeJoin(index, left, right, output, comparator, leftKeyFunction, rightKeyFunction, joiner);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, INDEX, NumberConverter.INT_WRITER, value.getIndex());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, LEFT, streamIdCodec, value.getLeft());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, RIGHT, streamIdCodec, value.getRight());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, OUTPUT, streamIdCodec, value.getOutput());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, KEY_COMPARATOR, comparatorCodec, value.getKeyComparator());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, LEFT_KEY_FUNCTION, functionCodec, value.getLeftKeyFunction());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, RIGHT_KEY_FUNCTION, functionCodec, value.getRightKeyFunction());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, JOINER, joinerCodec, value.getJoiner());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<DataflowResponseTaskData> localTaskStat(JsonCodec<TaskStatus> statusCodec, JsonCodec<Map<Integer, NodeStat>> nodesCodec, JsonCodec<Instant> instantCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            TaskStatus taskStatus = (TaskStatus)((Object)((Object)JsonModule.readValue(reader, STATUS, statusCodec)));
            reader.comma();
            Instant start = (Instant)JsonModule.readValue(reader, START, instantCodec);
            reader.comma();
            Instant finish = (Instant)JsonModule.readValue(reader, FINISH, instantCodec);
            reader.comma();
            String error = (String)JsonModule.readValue(reader, ERROR, StringConverter.READER);
            reader.comma();
            Map nodes = (Map)JsonModule.readValue(reader, NODE_STATS, nodesCodec);
            reader.comma();
            String graphviz = (String)JsonModule.readValue(reader, GRAPHVIZ, StringConverter.READER);
            reader.endObject();
            return new DataflowResponseTaskData(taskStatus, start, finish, error, nodes, graphviz);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, STATUS, statusCodec, value.getStatus());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, START, instantCodec, value.getStartTime());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, FINISH, instantCodec, value.getFinishTime());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, ERROR, StringConverter.WRITER, value.getErrorString());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, NODE_STATS, nodesCodec, value.getNodes());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, GRAPHVIZ, StringConverter.WRITER, value.getGraphViz());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<LocalTaskData> localTaskData(JsonCodec<TaskStatus> statusCodec, JsonCodec<Map<Integer, NodeStat>> nodesCodec, JsonCodec<Instant> instantCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            TaskStatus taskStatus = (TaskStatus)((Object)((Object)JsonModule.readValue(reader, STATUS, statusCodec)));
            reader.comma();
            String graph = (String)JsonModule.readValue(reader, GRAPH, StringConverter.READER);
            reader.comma();
            Map nodes = (Map)JsonModule.readValue(reader, NODE_STATS, nodesCodec);
            reader.comma();
            Instant started = (Instant)JsonModule.readValue(reader, STARTED, instantCodec);
            reader.comma();
            Instant finished = (Instant)JsonModule.readValue(reader, FINISHED, instantCodec);
            reader.comma();
            String error = (String)JsonModule.readValue(reader, ERROR, StringConverter.READER);
            reader.endObject();
            return new LocalTaskData(taskStatus, graph, nodes, started, finished, error);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, STATUS, statusCodec, value.getStatus());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, GRAPH, StringConverter.WRITER, value.getGraph());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, NODE_STATS, nodesCodec, value.getNodeStats());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, STARTED, instantCodec, value.getStarted());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, FINISHED, instantCodec, value.getFinished());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, ERROR, StringConverter.WRITER, value.getError());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    JsonCodec<ReducedTaskData> reducedTaskData(JsonCodec<List<TaskStatus>> statusesCodec, JsonCodec<Map<Integer, NodeStat>> nodeStatsCodec) {
        return JsonCodec.of(reader -> {
            if (reader.last() != 123) {
                throw reader.newParseError("Expected '{'");
            }
            List taskStatuses = (List)JsonModule.readValue(reader, STATUSES, statusesCodec);
            reader.comma();
            String graph = (String)JsonModule.readValue(reader, GRAPH, StringConverter.READER);
            reader.comma();
            Map nodes = (Map)JsonModule.readValue(reader, NODE_STATS, nodeStatsCodec);
            reader.endObject();
            return new ReducedTaskData(taskStatuses, graph, nodes);
        }, (writer, value) -> {
            writer.writeByte((byte)123);
            JsonModule.writeValue(writer, STATUSES, statusesCodec, value.getStatuses());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, GRAPH, StringConverter.WRITER, value.getGraph());
            writer.writeByte((byte)44);
            JsonModule.writeValue(writer, NODE_STATS, nodeStatsCodec, value.getReducedNodeStats());
            writer.writeByte((byte)125);
        });
    }

    @Provides
    SubtypeNameFactory subtypeNames() {
        return subtype -> subtype == NATURAL_ORDER_CLASS ? "Comparator.naturalOrder" : null;
    }

    @Provides
    <K, V> JsonCodec<Map<K, V>> map(JsonCodec<K> keyCodec, JsonCodec<V> valueCodec) {
        return JsonCodec.of(reader -> reader.readCollection($ -> {
            if (reader.last() != 91) {
                throw reader.newParseError("Expecting '[' as collection start");
            }
            reader.getNextToken();
            Object key = keyCodec.read(reader);
            reader.comma();
            reader.getNextToken();
            Object value = valueCodec.read(reader);
            reader.endArray();
            return new AbstractMap.SimpleEntry<Object, Object>(key, value);
        }).stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), (writer, value) -> writer.serialize(value.entrySet(), ($, entry) -> {
            writer.writeByte((byte)91);
            keyCodec.write(writer, entry.getKey());
            writer.writeByte((byte)44);
            valueCodec.write(writer, entry.getValue());
            writer.writeByte((byte)93);
        }));
    }

    @Provides
    <T> JsonCodec<List<T>> list(JsonCodec<T> elementCodec) {
        return JsonCodec.of(reader -> reader.readCollection((JsonReader.ReadObject)elementCodec), (writer, value) -> writer.serialize(value, (JsonWriter.WriteObject)elementCodec));
    }

    private static <T> T readValue(JsonReader<?> reader, String key, JsonReader.ReadObject<? extends T> readObject) throws IOException {
        reader.getNextToken();
        String readKey = reader.readKey();
        if (!readKey.equals(key)) {
            throw reader.newParseError("Expected key '" + key + '\'');
        }
        return (T)readObject.read(reader);
    }

    private static <T> void writeValue(JsonWriter writer, String key, JsonWriter.WriteObject<T> writeObject, T value) {
        writer.writeString(key);
        writer.writeByte((byte)58);
        writeObject.write(writer, value);
    }

    @FunctionalInterface
    public static interface SubtypeNameFactory {
        @Nullable
        public String getName(Class<?> var1);
    }

    @QualifierAnnotation
    @Target(value={ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD, ElementType.TYPE})
    @Retention(value=RetentionPolicy.RUNTIME)
    public static @interface Subtypes {
    }
}

