/*
 * Decompiled with CFR 0.152.
 */
package cz.seznam.euphoria.core.client.io;

import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.Writer;
import cz.seznam.euphoria.core.util.IOUtils;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MultiDataSink<KEY, IN>
implements DataSink<IN> {
    private final UnaryFunction<IN, KEY> selectFunction;
    private final Map<KEY, DataSinkWrapper<KEY, IN, ?>> sinks;

    private MultiDataSink(UnaryFunction<IN, KEY> selectFunction, Map<KEY, DataSinkWrapper<KEY, IN, ?>> sinks) {
        this.selectFunction = selectFunction;
        this.sinks = sinks;
    }

    public static <KEY, IN> Builder<KEY, IN> selectBy(UnaryFunction<IN, KEY> selectFunction) {
        return new Builder(selectFunction);
    }

    @Override
    public Writer<IN> openWriter(int partitionId) {
        final HashMap writerMap = new HashMap();
        this.sinks.values().forEach(sink -> writerMap.put(sink.getKey(), sink.getDataSink().openWriter(partitionId)));
        return new Writer<IN>(){

            @Override
            public void write(IN elem) throws IOException {
                Object key = MultiDataSink.this.selectFunction.apply(elem);
                UnaryFunction mapper = ((DataSinkWrapper)MultiDataSink.this.sinks.get(key)).getMapper();
                ((Writer)writerMap.get(key)).write(mapper.apply(elem));
            }

            @Override
            public void commit() throws IOException {
                IOUtils.forEach(writerMap.values(), Writer::commit);
            }

            @Override
            public void close() throws IOException {
                IOUtils.forEach(writerMap.values(), Writer::close);
            }
        };
    }

    @Override
    public void commit() throws IOException {
        IOUtils.forEach(this.sinks.values().stream().map(DataSinkWrapper::getDataSink), DataSink::commit);
    }

    @Override
    public void rollback() throws IOException {
        IOUtils.forEach(this.sinks.values().stream().map(DataSinkWrapper::getDataSink), DataSink::rollback);
    }

    private static class DataSinkWrapper<KEY, IN, OUT>
    implements Serializable {
        private final KEY key;
        private final UnaryFunction<IN, OUT> mapper;
        private final DataSink<OUT> dataSink;

        DataSinkWrapper(KEY key, UnaryFunction<IN, OUT> mapper, DataSink<OUT> dataSink) {
            this.key = key;
            this.mapper = mapper;
            this.dataSink = dataSink;
        }

        KEY getKey() {
            return this.key;
        }

        UnaryFunction<IN, OUT> getMapper() {
            return this.mapper;
        }

        DataSink<OUT> getDataSink() {
            return this.dataSink;
        }
    }

    public static class Builder<KEY, IN> {
        private final UnaryFunction<IN, KEY> selectFunction;
        private final List<DataSinkWrapper<KEY, IN, ?>> dataSinkWrappers = new ArrayList();

        private Builder(UnaryFunction<IN, KEY> selectFunction) {
            this.selectFunction = selectFunction;
        }

        public <OUT> Builder<KEY, IN> addSink(KEY key, UnaryFunction<IN, OUT> mapper, DataSink<OUT> sink) {
            this.dataSinkWrappers.add(new DataSinkWrapper<KEY, IN, OUT>(key, mapper, sink));
            return this;
        }

        public DataSink<IN> build() {
            HashMap sinksMap = new HashMap();
            this.dataSinkWrappers.forEach(el -> sinksMap.put(el.getKey(), el));
            return new MultiDataSink(this.selectFunction, sinksMap);
        }
    }
}

