/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.beam.core;

import cz.o2.proxima.beam.core.AttributeFamilyProxyDataAccessor;
import cz.o2.proxima.beam.core.DataAccessor;
import cz.o2.proxima.beam.core.DataAccessorFactory;
import cz.o2.proxima.beam.core.PCollectionTools;
import cz.o2.proxima.beam.core.io.AttributeDescriptorCoder;
import cz.o2.proxima.beam.core.io.EntityDescriptorCoder;
import cz.o2.proxima.beam.core.io.StreamElementCoder;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.AttributeFamilyDescriptor;
import cz.o2.proxima.repository.AttributeFamilyProxyDescriptor;
import cz.o2.proxima.repository.DataOperator;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.storage.internal.DataAccessorLoader;
import cz.o2.proxima.util.Pair;
import java.io.Serializable;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Union;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.OutputHint;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BeamDataOperator
implements DataOperator {
    private final Repository repo;
    @Nullable
    private final DirectDataOperator direct;
    private final DataAccessorLoader<BeamDataOperator, DataAccessor, DataAccessorFactory> loader;
    private final Map<AttributeFamilyDescriptor, DataAccessor> accessorMap;
    private final Map<PCollectionDescriptor, PCollection<StreamElement>> createdStreamsMap = Collections.synchronizedMap(new HashMap());
    private final Set<Pipeline> typesRegistered = new HashSet<Pipeline>();

    BeamDataOperator(Repository repo) {
        this.repo = repo;
        this.accessorMap = Collections.synchronizedMap(new HashMap());
        this.loader = DataAccessorLoader.of((Repository)repo, DataAccessorFactory.class);
        this.direct = repo.hasOperator("direct") ? (DirectDataOperator)repo.getOrCreateOperator(DirectDataOperator.class) : null;
    }

    public void close() {
        if (this.direct != null) {
            this.direct.close();
        }
        this.reload();
    }

    public void reload() {
        this.accessorMap.clear();
        this.createdStreamsMap.clear();
        this.typesRegistered.clear();
    }

    @SafeVarargs
    public final PCollection<StreamElement> getStream(Pipeline pipeline, Position position, boolean stopAtCurrent, boolean useEventTime, AttributeDescriptor<?> ... attrs) {
        return this.getStream(null, pipeline, position, stopAtCurrent, useEventTime, attrs);
    }

    @SafeVarargs
    public final PCollection<StreamElement> getStream(@Nullable String name, Pipeline pipeline, Position position, boolean stopAtCurrent, boolean useEventTime, AttributeDescriptor<?> ... attrs) {
        return this.getStream(name, pipeline, position, stopAtCurrent, useEventTime, Long.MAX_VALUE, attrs);
    }

    @SafeVarargs
    @VisibleForTesting
    final PCollection<StreamElement> getStream(@Nullable String name, Pipeline pipeline, Position position, boolean stopAtCurrent, boolean useEventTime, long limit, AttributeDescriptor<?> ... attrs) {
        return (PCollection)this.findSuitableAccessors(af -> af.getAccess().canReadCommitLog(), "commit-log", attrs).map(da -> {
            StreamDescriptor desc = new StreamDescriptor(pipeline, (DataAccessor)da, name, position, stopAtCurrent, useEventTime);
            return this.getOrCreatePCollection(desc, limit < 0L || limit == Long.MAX_VALUE, d -> d.createStream(limit));
        }).reduce((left, right) -> Union.of((PCollection[])new PCollection[]{left, right}).output(new OutputHint[0])).orElseThrow(this.failNotFound(attrs, "commit-log")).apply(this.filterAttrs(attrs));
    }

    @SafeVarargs
    public final PCollection<StreamElement> getBatchUpdates(Pipeline pipeline, AttributeDescriptor<?> ... attrs) {
        return this.getBatchUpdates(pipeline, Long.MIN_VALUE, Long.MAX_VALUE, attrs);
    }

    @SafeVarargs
    public final PCollection<StreamElement> getBatchUpdates(Pipeline pipeline, long startStamp, long endStamp, AttributeDescriptor<?> ... attrs) {
        return this.getBatchUpdates(pipeline, startStamp, endStamp, false, attrs);
    }

    @SafeVarargs
    public final PCollection<StreamElement> getBatchUpdates(Pipeline pipeline, long startStamp, long endStamp, boolean asStream, AttributeDescriptor<?> ... attrs) {
        List<AttributeDescriptor> attrClosure = this.findSuitableFamilies(af -> af.getAccess().canReadBatchUpdates(), attrs).filter(p -> ((Optional)p.getSecond()).isPresent()).map(p -> (AttributeFamilyDescriptor)((Optional)p.getSecond()).get()).flatMap(d -> d.getAttributes().stream()).distinct().collect(Collectors.toList());
        AttributeDescriptor[] closureAsArray = attrClosure.toArray(new AttributeDescriptor[attrClosure.size()]);
        return (PCollection)this.findSuitableAccessors(af -> af.getAccess().canReadBatchUpdates(), "batch-updates", closureAsArray).map(da -> {
            BatchUpdatesDescriptor desc = new BatchUpdatesDescriptor(pipeline, (DataAccessor)da, startStamp, endStamp, asStream);
            return this.getOrCreatePCollection(desc, true, d -> d.createBatchUpdates(attrClosure));
        }).reduce((left, right) -> Union.of((PCollection[])new PCollection[]{left, right}).output(new OutputHint[0])).orElseThrow(this.failNotFound(attrs, "batch-updates")).apply(this.filterAttrs(attrs));
    }

    public final PCollection<StreamElement> getBatchSnapshot(Pipeline pipeline, AttributeDescriptor<?> ... attrs) {
        return this.getBatchSnapshot(pipeline, Long.MIN_VALUE, Long.MAX_VALUE, attrs);
    }

    public final PCollection<StreamElement> getBatchSnapshot(Pipeline pipeline, long fromStamp, long untilStamp, AttributeDescriptor<?> ... attrs) {
        List attrList = Arrays.stream(attrs).collect(Collectors.toList());
        List resolvedAttrs = this.findSuitableFamilies(af -> af.getAccess().canReadBatchSnapshot(), attrs).collect(Collectors.toList());
        boolean unresolved = resolvedAttrs.stream().anyMatch(p -> !((Optional)p.getSecond()).isPresent());
        if (!unresolved) {
            return (PCollection)resolvedAttrs.stream().flatMap(p -> ((AttributeFamilyDescriptor)((Optional)p.getSecond()).get()).getAttributes().stream().map(a -> Pair.of((Object)a, (Object)((AttributeFamilyDescriptor)((Optional)p.getSecond()).get())))).map(Pair::getSecond).distinct().map(this::accessorFor).distinct().map(da -> {
                BatchSnapshotDescriptor desc = new BatchSnapshotDescriptor(pipeline, (DataAccessor)da, fromStamp, untilStamp);
                return this.getOrCreatePCollection(desc, true, d -> d.createBatchUpdates(attrList));
            }).reduce((left, right) -> Union.of((PCollection[])new PCollection[]{left, right}).output(new OutputHint[0])).orElseThrow(this.failNotFound(attrs, "batch-snapshot")).apply(this.filterAttrs(attrs));
        }
        return PCollectionTools.reduceAsSnapshot("getBatchSnapshot:" + Arrays.toString(attrs), this.getBatchUpdates(pipeline, fromStamp, untilStamp, attrs));
    }

    public DataAccessor getAccessorFor(AttributeFamilyDescriptor family) {
        return this.accessorFor(family);
    }

    private Stream<DataAccessor> findSuitableAccessors(Predicate<AttributeFamilyDescriptor> predicate, String accessorType, AttributeDescriptor<?>[] attrs) {
        return this.findSuitableFamilies(predicate, attrs).map(p -> {
            if (!((Optional)p.getSecond()).isPresent()) {
                throw new IllegalArgumentException("Missing " + accessorType + " for " + p.getFirst());
            }
            return (AttributeFamilyDescriptor)((Optional)p.getSecond()).get();
        }).distinct().map(this::accessorFor);
    }

    private Stream<Pair<AttributeDescriptor<?>, Optional<AttributeFamilyDescriptor>>> findSuitableFamilies(Predicate<AttributeFamilyDescriptor> predicate, AttributeDescriptor<?>[] attrs) {
        return Arrays.stream(attrs).map(desc -> Pair.of((Object)desc, this.repo.getFamiliesForAttribute(desc).stream().filter(predicate).min(Comparator.comparingInt(a -> a.getType().ordinal()))));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private DataAccessor accessorFor(AttributeFamilyDescriptor family) {
        Map<AttributeFamilyDescriptor, DataAccessor> map = this.accessorMap;
        synchronized (map) {
            if (family.isProxy()) {
                this.accessorFor(family.toProxy().getTargetFamilyRead());
                this.accessorFor(family.toProxy().getTargetFamilyWrite());
            }
            return this.accessorMap.computeIfAbsent(family, this::createAccessorFor);
        }
    }

    private DataAccessor createAccessorFor(AttributeFamilyDescriptor family) {
        if (family.isProxy()) {
            AttributeFamilyProxyDescriptor proxy = family.toProxy();
            DataAccessor readAccessor = this.accessorFor(proxy.getTargetFamilyRead());
            DataAccessor writeAccessor = this.accessorFor(proxy.getTargetFamilyWrite());
            return AttributeFamilyProxyDataAccessor.of(proxy, readAccessor, writeAccessor);
        }
        URI uri = family.getStorageUri();
        return this.loader.findForUri(uri).map(f -> (DataAccessor)f.createAccessor(this, family.getEntity(), uri, family.getCfg())).orElseThrow(() -> new IllegalStateException("No accessor for URI " + family.getStorageUri()));
    }

    public Repository getRepository() {
        return this.repo;
    }

    public DirectDataOperator getDirect() {
        return Objects.requireNonNull(this.direct);
    }

    public boolean hasDirect() {
        return this.direct != null;
    }

    private PTransform<PCollection<StreamElement>, PCollection<StreamElement>> filterAttrs(AttributeDescriptor<?>[] attrs) {
        final Set attrSet = Arrays.stream(attrs).collect(Collectors.toSet());
        return new PTransform<PCollection<StreamElement>, PCollection<StreamElement>>(){

            public PCollection<StreamElement> expand(PCollection<StreamElement> input) {
                return ((PCollection)input.apply((PTransform)Filter.by((SerializableFunction & Serializable)el -> attrSet.contains(el.getAttributeDescriptor())))).setTypeDescriptor(TypeDescriptor.of(StreamElement.class));
            }
        };
    }

    private Supplier<IllegalArgumentException> failNotFound(AttributeDescriptor<?>[] attrs, String accessorType) {
        Set families = Arrays.stream(attrs).flatMap(a -> this.repo.getFamiliesForAttribute(a).stream()).collect(Collectors.toSet());
        return () -> new IllegalArgumentException(String.format("Failed to find suitable family type [%s] in [%s]", accessorType, families));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T extends PCollectionDescriptor> PCollection<StreamElement> getOrCreatePCollection(T desc, boolean cacheable, PCollectionFactoryFromDescriptor<T> factory) {
        PCollection<StreamElement> ret;
        if (cacheable) {
            Map<PCollectionDescriptor, PCollection<StreamElement>> map = this.createdStreamsMap;
            synchronized (map) {
                PCollection<StreamElement> current = this.createdStreamsMap.get(desc);
                if (current == null) {
                    ret = factory.apply(desc);
                    this.createdStreamsMap.put(desc, ret);
                } else {
                    ret = current;
                }
            }
        } else {
            ret = factory.apply(desc);
        }
        if (!this.typesRegisteredFor(ret.getPipeline())) {
            this.registerTypesFor(ret.getPipeline());
        }
        return ret;
    }

    private void registerTypesFor(Pipeline pipeline) {
        CoderRegistry registry = pipeline.getCoderRegistry();
        registry.registerCoderForClass(GlobalWindow.class, (Coder)GlobalWindow.Coder.INSTANCE);
        registry.registerCoderForClass(IntervalWindow.class, (Coder)IntervalWindow.IntervalWindowCoder.of());
        registry.registerCoderForClass(StreamElement.class, (Coder)StreamElementCoder.of(this.repo));
        registry.registerCoderForClass(EntityDescriptor.class, (Coder)EntityDescriptorCoder.of(this.repo));
        registry.registerCoderForClass(AttributeDescriptor.class, (Coder)AttributeDescriptorCoder.of(this.repo));
        this.typesRegistered.add(pipeline);
    }

    private boolean typesRegisteredFor(Pipeline pipeline) {
        return this.typesRegistered.contains(pipeline);
    }

    private final class BatchSnapshotDescriptor
    implements PCollectionDescriptor {
        private final Pipeline pipeline;
        private final DataAccessor dataAcessor;
        private final long fromStamp;
        private final long untilStamp;

        PCollection<StreamElement> createBatchUpdates(List<AttributeDescriptor<?>> attrList) {
            return this.dataAcessor.createBatch(this.pipeline, attrList, this.fromStamp, this.untilStamp);
        }

        public BatchSnapshotDescriptor(Pipeline pipeline, DataAccessor dataAcessor, long fromStamp, long untilStamp) {
            this.pipeline = pipeline;
            this.dataAcessor = dataAcessor;
            this.fromStamp = fromStamp;
            this.untilStamp = untilStamp;
        }

        public Pipeline getPipeline() {
            return this.pipeline;
        }

        public DataAccessor getDataAcessor() {
            return this.dataAcessor;
        }

        public long getFromStamp() {
            return this.fromStamp;
        }

        public long getUntilStamp() {
            return this.untilStamp;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof BatchSnapshotDescriptor)) {
                return false;
            }
            BatchSnapshotDescriptor other = (BatchSnapshotDescriptor)o;
            Pipeline this$pipeline = this.getPipeline();
            Pipeline other$pipeline = other.getPipeline();
            if (this$pipeline == null ? other$pipeline != null : !this$pipeline.equals(other$pipeline)) {
                return false;
            }
            DataAccessor this$dataAcessor = this.getDataAcessor();
            DataAccessor other$dataAcessor = other.getDataAcessor();
            if (this$dataAcessor == null ? other$dataAcessor != null : !this$dataAcessor.equals(other$dataAcessor)) {
                return false;
            }
            if (this.getFromStamp() != other.getFromStamp()) {
                return false;
            }
            return this.getUntilStamp() == other.getUntilStamp();
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Pipeline $pipeline = this.getPipeline();
            result = result * 59 + ($pipeline == null ? 43 : $pipeline.hashCode());
            DataAccessor $dataAcessor = this.getDataAcessor();
            result = result * 59 + ($dataAcessor == null ? 43 : $dataAcessor.hashCode());
            long $fromStamp = this.getFromStamp();
            result = result * 59 + (int)($fromStamp >>> 32 ^ $fromStamp);
            long $untilStamp = this.getUntilStamp();
            result = result * 59 + (int)($untilStamp >>> 32 ^ $untilStamp);
            return result;
        }

        public String toString() {
            return "BeamDataOperator.BatchSnapshotDescriptor(pipeline=" + this.getPipeline() + ", dataAcessor=" + this.getDataAcessor() + ", fromStamp=" + this.getFromStamp() + ", untilStamp=" + this.getUntilStamp() + ")";
        }
    }

    private final class BatchUpdatesDescriptor
    implements PCollectionDescriptor {
        private final Pipeline pipeline;
        private final DataAccessor dataAccessor;
        private final long startStamp;
        private final long endStamp;
        private final boolean asStream;

        PCollection<StreamElement> createBatchUpdates(List<AttributeDescriptor<?>> attrList) {
            return this.asStream ? this.dataAccessor.createStreamFromUpdates(this.pipeline, attrList, this.startStamp, this.endStamp, -1L) : this.dataAccessor.createBatch(this.pipeline, attrList, this.startStamp, this.endStamp);
        }

        public BatchUpdatesDescriptor(Pipeline pipeline, DataAccessor dataAccessor, long startStamp, long endStamp, boolean asStream) {
            this.pipeline = pipeline;
            this.dataAccessor = dataAccessor;
            this.startStamp = startStamp;
            this.endStamp = endStamp;
            this.asStream = asStream;
        }

        public Pipeline getPipeline() {
            return this.pipeline;
        }

        public DataAccessor getDataAccessor() {
            return this.dataAccessor;
        }

        public long getStartStamp() {
            return this.startStamp;
        }

        public long getEndStamp() {
            return this.endStamp;
        }

        public boolean isAsStream() {
            return this.asStream;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof BatchUpdatesDescriptor)) {
                return false;
            }
            BatchUpdatesDescriptor other = (BatchUpdatesDescriptor)o;
            Pipeline this$pipeline = this.getPipeline();
            Pipeline other$pipeline = other.getPipeline();
            if (this$pipeline == null ? other$pipeline != null : !this$pipeline.equals(other$pipeline)) {
                return false;
            }
            DataAccessor this$dataAccessor = this.getDataAccessor();
            DataAccessor other$dataAccessor = other.getDataAccessor();
            if (this$dataAccessor == null ? other$dataAccessor != null : !this$dataAccessor.equals(other$dataAccessor)) {
                return false;
            }
            if (this.getStartStamp() != other.getStartStamp()) {
                return false;
            }
            if (this.getEndStamp() != other.getEndStamp()) {
                return false;
            }
            return this.isAsStream() == other.isAsStream();
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Pipeline $pipeline = this.getPipeline();
            result = result * 59 + ($pipeline == null ? 43 : $pipeline.hashCode());
            DataAccessor $dataAccessor = this.getDataAccessor();
            result = result * 59 + ($dataAccessor == null ? 43 : $dataAccessor.hashCode());
            long $startStamp = this.getStartStamp();
            result = result * 59 + (int)($startStamp >>> 32 ^ $startStamp);
            long $endStamp = this.getEndStamp();
            result = result * 59 + (int)($endStamp >>> 32 ^ $endStamp);
            result = result * 59 + (this.isAsStream() ? 79 : 97);
            return result;
        }

        public String toString() {
            return "BeamDataOperator.BatchUpdatesDescriptor(pipeline=" + this.getPipeline() + ", dataAccessor=" + this.getDataAccessor() + ", startStamp=" + this.getStartStamp() + ", endStamp=" + this.getEndStamp() + ", asStream=" + this.isAsStream() + ")";
        }
    }

    private final class StreamDescriptor
    implements PCollectionDescriptor {
        private final Pipeline pipeline;
        private final DataAccessor dataAccessor;
        @Nullable
        private final String name;
        private final Position position;
        private final boolean stopAtCurrent;
        private final boolean useEventTime;

        PCollection<StreamElement> createStream(long limit) {
            return this.dataAccessor.createStream(this.name, this.pipeline, this.position, this.stopAtCurrent, this.useEventTime, limit).setTypeDescriptor(TypeDescriptor.of(StreamElement.class));
        }

        public StreamDescriptor(Pipeline pipeline, @Nullable DataAccessor dataAccessor, String name, Position position, boolean stopAtCurrent, boolean useEventTime) {
            this.pipeline = pipeline;
            this.dataAccessor = dataAccessor;
            this.name = name;
            this.position = position;
            this.stopAtCurrent = stopAtCurrent;
            this.useEventTime = useEventTime;
        }

        public Pipeline getPipeline() {
            return this.pipeline;
        }

        public DataAccessor getDataAccessor() {
            return this.dataAccessor;
        }

        @Nullable
        public String getName() {
            return this.name;
        }

        public Position getPosition() {
            return this.position;
        }

        public boolean isStopAtCurrent() {
            return this.stopAtCurrent;
        }

        public boolean isUseEventTime() {
            return this.useEventTime;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof StreamDescriptor)) {
                return false;
            }
            StreamDescriptor other = (StreamDescriptor)o;
            Pipeline this$pipeline = this.getPipeline();
            Pipeline other$pipeline = other.getPipeline();
            if (this$pipeline == null ? other$pipeline != null : !this$pipeline.equals(other$pipeline)) {
                return false;
            }
            DataAccessor this$dataAccessor = this.getDataAccessor();
            DataAccessor other$dataAccessor = other.getDataAccessor();
            if (this$dataAccessor == null ? other$dataAccessor != null : !this$dataAccessor.equals(other$dataAccessor)) {
                return false;
            }
            String this$name = this.getName();
            String other$name = other.getName();
            if (this$name == null ? other$name != null : !this$name.equals(other$name)) {
                return false;
            }
            Position this$position = this.getPosition();
            Position other$position = other.getPosition();
            if (this$position == null ? other$position != null : !this$position.equals(other$position)) {
                return false;
            }
            if (this.isStopAtCurrent() != other.isStopAtCurrent()) {
                return false;
            }
            return this.isUseEventTime() == other.isUseEventTime();
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Pipeline $pipeline = this.getPipeline();
            result = result * 59 + ($pipeline == null ? 43 : $pipeline.hashCode());
            DataAccessor $dataAccessor = this.getDataAccessor();
            result = result * 59 + ($dataAccessor == null ? 43 : $dataAccessor.hashCode());
            String $name = this.getName();
            result = result * 59 + ($name == null ? 43 : $name.hashCode());
            Position $position = this.getPosition();
            result = result * 59 + ($position == null ? 43 : $position.hashCode());
            result = result * 59 + (this.isStopAtCurrent() ? 79 : 97);
            result = result * 59 + (this.isUseEventTime() ? 79 : 97);
            return result;
        }

        public String toString() {
            return "BeamDataOperator.StreamDescriptor(pipeline=" + this.getPipeline() + ", dataAccessor=" + this.getDataAccessor() + ", name=" + this.getName() + ", position=" + this.getPosition() + ", stopAtCurrent=" + this.isStopAtCurrent() + ", useEventTime=" + this.isUseEventTime() + ")";
        }
    }

    private static interface PCollectionDescriptor {
    }

    @FunctionalInterface
    private static interface PCollectionFactoryFromDescriptor<T extends PCollectionDescriptor> {
        public PCollection<StreamElement> apply(T var1);
    }
}

