/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.beam.storage;

import com.typesafe.config.Config;
import cz.o2.proxima.beam.core.BeamDataOperator;
import cz.o2.proxima.beam.core.DataAccessor;
import cz.o2.proxima.beam.core.DataAccessorFactory;
import cz.o2.proxima.functional.UnaryFunction;
import cz.o2.proxima.functional.UnaryPredicate;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.AttributeFamilyDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.repository.config.ConfigUtils;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.storage.internal.AbstractDataAccessorFactory;
import java.io.Serializable;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.values.PCollection;

public class TestStreamStorage
implements DataAccessorFactory {
    private static final Map<Repository, StreamProviders> storages = new ConcurrentHashMap<Repository, StreamProviders>();
    private Repository repo;

    public static Config replaceStorages(Config config) {
        return TestStreamStorage.replaceStorages(config, (UnaryPredicate<String>)(UnaryPredicate & Serializable)name -> true);
    }

    public static Config replaceStorages(Config config, UnaryPredicate<String> familyFilter) {
        return ConfigUtils.withStorageReplacement((Config)config, familyFilter, (UnaryFunction & Serializable)name -> URI.create(String.format("test-stream://%s", name)));
    }

    public static void putStream(Repository repo, AttributeFamilyDescriptor family, TestStream<StreamElement> stream) {
        StreamProviders providers = storages.computeIfAbsent(repo, k -> new StreamProviders());
        providers.put(URI.create(String.format("test-stream://%s", family.getName())), stream);
    }

    public void setup(Repository repo) {
        this.repo = Objects.requireNonNull(repo);
    }

    public AbstractDataAccessorFactory.Accept accepts(URI uri) {
        return uri.getScheme().equals("test-stream") ? AbstractDataAccessorFactory.Accept.ACCEPT : AbstractDataAccessorFactory.Accept.REJECT;
    }

    public DataAccessor createAccessor(BeamDataOperator operator, EntityDescriptor entity, final URI uri, Map<String, Object> cfg) {
        return new DataAccessor(){

            public PCollection<StreamElement> createStream(String name, Pipeline pipeline, Position position, boolean stopAtCurrent, boolean eventTime, long limit) {
                return TestStreamStorage.this.streamFor(pipeline, uri);
            }

            public PCollection<StreamElement> createStreamFromUpdates(Pipeline pipeline, List<AttributeDescriptor<?>> attrs, long startStamp, long endStamp, long limit) {
                return TestStreamStorage.this.streamFor(pipeline, uri);
            }

            public PCollection<StreamElement> createBatch(Pipeline pipeline, List<AttributeDescriptor<?>> attrs, long startStamp, long endStamp) {
                return TestStreamStorage.this.streamFor(pipeline, uri);
            }
        };
    }

    private PCollection<StreamElement> streamFor(Pipeline pipeline, URI uri) {
        return (PCollection)pipeline.apply(this.providers().get(uri));
    }

    private StreamProviders providers() {
        return Objects.requireNonNull(storages.get(this.repo));
    }

    private static class StreamProviders {
        private final Map<URI, TestStream<StreamElement>> streams = new HashMap<URI, TestStream<StreamElement>>();

        private StreamProviders() {
        }

        TestStream<StreamElement> get(URI uri) {
            return Objects.requireNonNull(this.streams.get(uri));
        }

        void put(URI uri, TestStream<StreamElement> stream) {
            this.streams.put(uri, stream);
        }
    }
}

