/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.beam.core;

import cz.o2.proxima.beam.core.BeamProxyTransform;
import cz.o2.proxima.beam.core.DataAccessor;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.AttributeFamilyProxyDescriptor;
import cz.o2.proxima.repository.AttributeProxyDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.transform.ProxyTransform;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.OutputHint;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AttributeFamilyProxyDataAccessor
implements DataAccessor {
    private static final Logger log = LoggerFactory.getLogger(AttributeFamilyProxyDataAccessor.class);
    private static final long serialVersionUID = 1L;
    private final AttributeFamilyProxyDescriptor proxy;
    private final DataAccessor readAccessor;
    private final DataAccessor writeAccessor;
    private final Map<AttributeDescriptor<?>, AttributeProxyDescriptor<?>> lookupTarget;
    private final Map<AttributeProxyDescriptor<?>, AttributeDescriptor<?>> lookupProxy;

    public static AttributeFamilyProxyDataAccessor of(AttributeFamilyProxyDescriptor proxy, DataAccessor readAccessor, DataAccessor writeAccessor) {
        return new AttributeFamilyProxyDataAccessor(proxy, readAccessor, writeAccessor);
    }

    private AttributeFamilyProxyDataAccessor(AttributeFamilyProxyDescriptor proxy, DataAccessor readAccessor, DataAccessor writeAccessor) {
        this.proxy = proxy;
        this.readAccessor = readAccessor;
        this.writeAccessor = writeAccessor;
        this.lookupTarget = proxy.getAttributes().stream().map(AttributeDescriptor::asProxy).collect(Collectors.toMap(AttributeProxyDescriptor::getReadTarget, Function.identity()));
        this.lookupProxy = proxy.getAttributes().stream().map(AttributeDescriptor::asProxy).collect(Collectors.toMap(Function.identity(), AttributeProxyDescriptor::getReadTarget));
    }

    @Override
    public PCollection<StreamElement> createStream(String name, Pipeline pipeline, Position position, boolean stopAtCurrent, boolean eventTime, long limit) {
        return this.asBeamTransform(this.lookupTarget.values()).map(transform -> transform.createStream(name, pipeline, position, stopAtCurrent, eventTime, limit)).orElseGet(() -> this.applyTransform(this.readAccessor.createStream(name, pipeline, position, stopAtCurrent, eventTime, limit)));
    }

    @Override
    public PCollection<StreamElement> createBatch(Pipeline pipeline, List<AttributeDescriptor<?>> attrs, long startStamp, long endStamp) {
        return this.asBeamTransform(attrs).map(transform -> transform.createBatch(pipeline, startStamp, endStamp)).orElseGet(() -> this.applyTransform(this.readAccessor.createBatch(pipeline, this.transformAttrs(attrs), startStamp, endStamp)));
    }

    @Override
    public PCollection<StreamElement> createStreamFromUpdates(Pipeline pipeline, List<AttributeDescriptor<?>> attrs, long startStamp, long endStamp, long limit) {
        return this.asBeamTransform(attrs).map(transform -> transform.createStreamFromUpdates(pipeline, startStamp, endStamp, limit)).orElseGet(() -> this.applyTransform(this.readAccessor.createStreamFromUpdates(pipeline, this.transformAttrs(attrs), startStamp, endStamp, limit)));
    }

    List<AttributeDescriptor<?>> transformAttrs(List<AttributeDescriptor<?>> attrs) {
        return attrs.stream().map(attr -> Objects.requireNonNull(this.lookupProxy.get(attr.asProxy()), () -> "Attribute " + attr.getName() + " is not present in proxy " + this.proxy)).collect(Collectors.toList());
    }

    private PCollection<StreamElement> applyTransform(PCollection<StreamElement> in) {
        return MapElements.of(in).using(this::transformSingleRead, in.getTypeDescriptor()).output(new OutputHint[0]);
    }

    private StreamElement transformSingleRead(StreamElement input) {
        AttributeProxyDescriptor<?> attr = this.lookupTarget.get(input.getAttributeDescriptor());
        if (attr != null) {
            ProxyTransform transform = attr.getReadTransform();
            String attribute = transform.asElementWise().toProxy(input.getAttribute());
            return StreamElement.upsert((EntityDescriptor)input.getEntityDescriptor(), attr, (String)input.getUuid(), (String)input.getKey(), (String)attribute, (long)input.getStamp(), (byte[])input.getValue());
        }
        log.warn("Received unknown attribute {}. Letting though, but this might cause other issues.", (Object)input.getAttributeDescriptor());
        return input;
    }

    private final Optional<BeamProxyTransform> asBeamTransform(Collection<? extends AttributeDescriptor<?>> attrs) {
        Set proxies = attrs.stream().map(AttributeDescriptor::asProxy).collect(Collectors.toSet());
        if (proxies.stream().allMatch(p -> p.getReadTransform().isContextual())) {
            List transforms = proxies.stream().map(AttributeProxyDescriptor::getReadTransform).distinct().collect(Collectors.toList());
            Preconditions.checkArgument((transforms.size() == 1 ? 1 : 0) != 0, (String)"When using {} only single attribute on input is allowed, got [%s] in [%s]", (Object)BeamProxyTransform.class.getName(), proxies, attrs);
            Preconditions.checkArgument((boolean)(transforms.get(0) instanceof BeamProxyTransform), (String)"Do not mix multiple contextual proxies in single config, expected class [%s] got [%s]", (Object)BeamProxyTransform.class.getName(), (Object)((ProxyTransform)transforms.get(0)).getClass().getName());
            BeamProxyTransform beamProxy = (BeamProxyTransform)transforms.get(0);
            return Optional.of(beamProxy);
        }
        return Optional.empty();
    }
}

