/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.transaction;

import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.transaction.ClientTransactionManager;
import cz.o2.proxima.direct.transform.DirectElementWiseTransform;
import cz.o2.proxima.functional.BiConsumer;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Iterables;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.repository.TransactionMode;
import cz.o2.proxima.repository.TransformationDescriptor;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.transaction.Commit;
import cz.o2.proxima.transaction.KeyAttribute;
import cz.o2.proxima.transaction.KeyAttributes;
import cz.o2.proxima.transaction.Response;
import cz.o2.proxima.transaction.State;
import cz.o2.proxima.transform.ElementWiseTransformation;
import cz.o2.proxima.util.ExceptionUtils;
import cz.o2.proxima.util.Optionals;
import cz.o2.proxima.util.Pair;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionalOnlineAttributeWriter
implements OnlineAttributeWriter {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TransactionalOnlineAttributeWriter.class);
    private final OnlineAttributeWriter delegate;
    private final OnlineAttributeWriter commitDelegate;
    private final ClientTransactionManager manager;
    private final ExecutorService executor;
    private final List<KeyAttribute> globalKeyAttributes;
    private final Map<AttributeDescriptor<?>, List<TransformationDescriptor>> attributeTransforms;

    public static TransactionalOnlineAttributeWriter of(DirectDataOperator direct, OnlineAttributeWriter delegate) {
        return new TransactionalOnlineAttributeWriter(direct, delegate);
    }

    public static TransactionalOnlineAttributeWriter global(DirectDataOperator direct) {
        ClientTransactionManager manager = direct.getClientTransactionManager();
        return new TransactionalOnlineAttributeWriter(direct, (OnlineAttributeWriter)Optionals.get(direct.getWriter((AttributeDescriptor<?>)manager.getCommitDesc()))){

            @Override
            public Transaction begin() {
                Transaction ret = super.begin();
                ExceptionUtils.unchecked(ret::beginGlobal);
                return ret;
            }
        };
    }

    private TransactionalOnlineAttributeWriter(DirectDataOperator direct, OnlineAttributeWriter delegate) {
        this.delegate = delegate;
        this.manager = direct.getClientTransactionManager();
        this.executor = direct.getContext().getExecutorService();
        this.commitDelegate = (OnlineAttributeWriter)Optionals.get(direct.getWriter((AttributeDescriptor<?>)this.manager.getCommitDesc()));
        this.globalKeyAttributes = this.getAttributesWithGlobalTransactionMode(direct);
        this.attributeTransforms = direct.getRepository().getTransformations().values().stream().filter(d -> d.getInputTransactionMode() == TransformationDescriptor.InputTransactionMode.TRANSACTIONAL).flatMap(t -> t.getAttributes().stream().map(a -> Pair.of((Object)a, (Object)t))).collect(Collectors.groupingBy(Pair::getFirst, Collectors.mapping(Pair::getSecond, Collectors.toList())));
    }

    private List<KeyAttribute> getAttributesWithGlobalTransactionMode(DirectDataOperator direct) {
        return direct.getRepository().getAllEntities().filter(EntityDescriptor::isTransactional).flatMap(e -> e.getAllAttributes().stream().filter(a -> a.getTransactionMode() == TransactionMode.ALL).map(a -> Pair.of((Object)e, (Object)a))).map(p -> ((AttributeDescriptor)p.getSecond()).isWildcard() ? KeyAttributes.ofAttributeDescriptor((EntityDescriptor)((EntityDescriptor)p.getFirst()), (String)("dummy-" + ((AttributeDescriptor)p.getSecond()).hashCode()), (AttributeDescriptor)((AttributeDescriptor)p.getSecond()), (long)Long.MAX_VALUE, (String)String.valueOf(((EntityDescriptor)p.getFirst()).hashCode())) : KeyAttributes.ofAttributeDescriptor((EntityDescriptor)((EntityDescriptor)p.getFirst()), (String)("dummy-" + ((AttributeDescriptor)p.getSecond()).hashCode()), (AttributeDescriptor)((AttributeDescriptor)p.getSecond()), (long)Long.MAX_VALUE)).collect(Collectors.toList());
    }

    @Override
    public URI getUri() {
        return this.delegate.getUri();
    }

    @Override
    public synchronized void close() {
        this.manager.close();
        this.delegate.close();
        this.commitDelegate.close();
    }

    @Override
    public synchronized void write(StreamElement data, CommitCallback statusCallback) {
        this.executor.execute(() -> {
            try (Transaction t = this.begin();){
                String suffix = data.getAttributeDescriptor().isWildcard() ? data.getAttribute().substring(data.getAttributeDescriptor().toAttributePrefix().length()) : null;
                KeyAttribute outputKeyAttribute = KeyAttributes.ofAttributeDescriptor((EntityDescriptor)data.getEntityDescriptor(), (String)data.getKey(), (AttributeDescriptor)data.getAttributeDescriptor(), (long)Long.MAX_VALUE, (String)suffix);
                t.update(Collections.singletonList(outputKeyAttribute));
                t.commitWrite(Collections.singletonList(data), statusCallback);
            }
            catch (Throwable e) {
                statusCallback.commit(false, e);
            }
        });
    }

    @Override
    public OnlineAttributeWriter.Factory<? extends OnlineAttributeWriter> asFactory() {
        AttributeWriterBase.Factory delegateFactory = this.delegate.asFactory();
        return arg_0 -> TransactionalOnlineAttributeWriter.lambda$asFactory$ad53c740$1((OnlineAttributeWriter.Factory)delegateFactory, arg_0);
    }

    @Override
    public boolean isTransactional() {
        return true;
    }

    @Override
    public TransactionalOnlineAttributeWriter transactional() {
        return this;
    }

    public Transaction begin() {
        String transactionId = UUID.randomUUID().toString();
        return new Transaction(transactionId);
    }

    @Generated
    public OnlineAttributeWriter getDelegate() {
        return this.delegate;
    }

    private static /* synthetic */ OnlineAttributeWriter lambda$asFactory$ad53c740$1(OnlineAttributeWriter.Factory delegateFactory, Repository repo) {
        return new TransactionalOnlineAttributeWriter((DirectDataOperator)repo.getOrCreateOperator(DirectDataOperator.class, new Consumer[0]), (OnlineAttributeWriter)delegateFactory.apply(repo));
    }

    public class Transaction
    implements AutoCloseable {
        private final String transactionId;
        private final BlockingQueue<Pair<String, Response>> responseQueue = new ArrayBlockingQueue<Pair<String, Response>>(100);
        private boolean isGlobalTransaction;
        private State.Flags state;
        private long sequenceId = -1L;
        private long stamp = Long.MIN_VALUE;

        private Transaction(String transactionId) {
            this.transactionId = transactionId;
            this.state = State.Flags.UNKNOWN;
        }

        void beginGlobal() throws TransactionRejectedException {
            Preconditions.checkArgument((!TransactionalOnlineAttributeWriter.this.globalKeyAttributes.isEmpty() ? 1 : 0) != 0, (Object)"Cannot resolve global transactional attributes.");
            this.isGlobalTransaction = true;
            this.update(TransactionalOnlineAttributeWriter.this.globalKeyAttributes);
        }

        public void update(List<KeyAttribute> addedInputs) throws TransactionRejectedException {
            Response.Flags expectedResponse;
            switch (this.state) {
                case UNKNOWN: {
                    TransactionalOnlineAttributeWriter.this.manager.begin(this.transactionId, (BiConsumer<String, Response>)ExceptionUtils.uncheckedBiConsumer(this::enqueueResponse), addedInputs);
                    expectedResponse = Response.Flags.OPEN;
                    break;
                }
                case OPEN: {
                    TransactionalOnlineAttributeWriter.this.manager.updateTransaction(this.transactionId, addedInputs);
                    expectedResponse = Response.Flags.UPDATED;
                    break;
                }
                default: {
                    throw new TransactionRejectedException(this.transactionId);
                }
            }
            Response response = this.takeResponse();
            if (response.getFlags() != expectedResponse) {
                throw new TransactionRejectedException(this.transactionId);
            }
            if (response.hasSequenceId()) {
                Preconditions.checkState((this.sequenceId == -1L || this.sequenceId == response.getSeqId() ? 1 : 0) != 0, (String)"Updated sequence ID from %s to %s. That is a bug in proxima's transactions.", (long)this.sequenceId, (long)response.getSeqId());
                this.sequenceId = response.getSeqId();
            }
            if (response.hasStamp()) {
                Preconditions.checkState((this.stamp == Long.MIN_VALUE || this.stamp == response.getStamp() ? 1 : 0) != 0, (String)"Updated stamp from %s to %s. That is a bug in proxima's transactions.", (long)this.stamp, (long)response.getStamp());
                this.stamp = response.getStamp();
            }
            this.state = State.Flags.OPEN;
        }

        public void commitWrite(List<StreamElement> outputs, CommitCallback callback) throws TransactionRejectedException {
            Collection<StreamElement> transformed;
            List<StreamElement> injected = outputs.stream().map(this::injectSequenceIdAndStamp).collect(Collectors.toList());
            try {
                transformed = this.applyTransforms(injected);
            }
            catch (TransactionRejectedRuntimeException ex) {
                throw (TransactionRejectedException)ex.getCause();
            }
            StreamElement toWrite = this.getSingleOrCommit(transformed);
            OnlineAttributeWriter writer = transformed.size() == 1 && !this.isGlobalTransaction ? TransactionalOnlineAttributeWriter.this.delegate : TransactionalOnlineAttributeWriter.this.commitDelegate;
            List<KeyAttribute> keyAttributes = transformed.stream().map(KeyAttributes::ofStreamElement).collect(Collectors.toList());
            TransactionalOnlineAttributeWriter.this.manager.commit(this.transactionId, keyAttributes);
            Response response = this.takeResponse();
            if (response.getFlags() != Response.Flags.COMMITTED) {
                if (response.getFlags() == Response.Flags.ABORTED) {
                    this.state = State.Flags.ABORTED;
                }
                throw new TransactionRejectedException(this.transactionId);
            }
            CommitCallback compositeCallback = (succ, exc) -> {
                if (!succ) {
                    this.rollback();
                }
                log.debug("Committed outputs {} (via {}) of transaction {}", new Object[]{transformed, toWrite, this.transactionId});
                callback.commit(succ, exc);
            };
            this.state = State.Flags.COMMITTED;
            writer.write(toWrite, compositeCallback);
        }

        private Response takeResponse() {
            return Optional.ofNullable(ExceptionUtils.uncheckedFactory((ExceptionUtils.ThrowingFactory & Serializable)() -> this.responseQueue.poll(5L, TimeUnit.SECONDS))).map(Pair::getSecond).orElse(Response.empty());
        }

        private Collection<StreamElement> applyTransforms(List<StreamElement> outputs) {
            ArrayList<StreamElement> newElements;
            HashSet<StreamElement> elements = new HashSet<StreamElement>();
            List<StreamElement> currentElements = outputs;
            do {
                newElements = new ArrayList<StreamElement>();
                for (StreamElement el : currentElements) {
                    List applicableTransforms;
                    if (!elements.add(el) || (applicableTransforms = (List)TransactionalOnlineAttributeWriter.this.attributeTransforms.get(el.getAttributeDescriptor())) == null) continue;
                    applicableTransforms.stream().filter(t -> !(t.getTransformation() instanceof TransactionValidator)).filter(t -> t.getFilter().apply(el)).forEach(td -> this.applyTransform((List<StreamElement>)newElements, el, (TransformationDescriptor)td));
                }
            } while (!(currentElements = newElements).isEmpty());
            this.applyValidations(elements);
            return elements;
        }

        private void applyValidations(Set<StreamElement> elements) {
            for (StreamElement el : elements) {
                List applicableTransforms = (List)TransactionalOnlineAttributeWriter.this.attributeTransforms.get(el.getAttributeDescriptor());
                if (applicableTransforms == null) continue;
                applicableTransforms.stream().filter(t -> t.getTransformation() instanceof TransactionValidator).filter(t -> t.getFilter().apply(el)).forEach(td -> this.applyTransform(Collections.emptyList(), el, (TransformationDescriptor)td));
            }
        }

        private void applyTransform(List<StreamElement> newElements, StreamElement el, TransformationDescriptor td) {
            if (td.getTransformation() instanceof TransactionValidator) {
                TransactionValidator transform = (TransactionValidator)td.getTransformation();
                transform.setTransaction(this);
                transform.transform(el, CommitCallback.noop());
            } else {
                ElementWiseTransformation transform = td.getTransformation().asElementWiseTransform();
                if (transform instanceof TransactionAware) {
                    ((TransactionAware)transform).setTransaction(this);
                }
                int currentSize = newElements.size();
                int add = transform.apply(el, (ElementWiseTransformation.Collector & Serializable)transformed -> newElements.add(this.injectSequenceIdAndStamp((StreamElement)transformed)));
                Preconditions.checkState((newElements.size() == currentSize + add ? 1 : 0) != 0, (String)"Transformation %s is asynchronous which not currently supported in transaction mode.", transform.getClass());
            }
        }

        private StreamElement getSingleOrCommit(Collection<StreamElement> outputs) {
            if (outputs.size() == 1 && !this.isGlobalTransaction) {
                return (StreamElement)Iterables.getOnlyElement(outputs);
            }
            return TransactionalOnlineAttributeWriter.this.manager.getCommitDesc().upsert(this.transactionId, this.stamp, (Object)Commit.of((long)this.sequenceId, (long)this.stamp, outputs));
        }

        private void enqueueResponse(String responseId, Response response) {
            ExceptionUtils.unchecked((ExceptionUtils.ThrowingRunnable & Serializable)() -> this.responseQueue.put((Pair<String, Response>)Pair.of((Object)responseId, (Object)response)));
        }

        private StreamElement injectSequenceIdAndStamp(StreamElement in) {
            Preconditions.checkArgument((!in.isDeleteWildcard() ? 1 : 0) != 0, (Object)"Wildcard deletes not yet supported");
            return StreamElement.upsert((EntityDescriptor)in.getEntityDescriptor(), (AttributeDescriptor)in.getAttributeDescriptor(), (long)this.sequenceId, (String)in.getKey(), (String)in.getAttribute(), (long)this.stamp, (byte[])in.getValue());
        }

        @Override
        public void close() {
            if (this.state == State.Flags.OPEN) {
                this.rollback();
            }
            TransactionalOnlineAttributeWriter.this.manager.release(this.transactionId);
        }

        public void rollback() {
            TransactionalOnlineAttributeWriter.this.manager.rollback(this.transactionId);
        }

        @Generated
        public String getTransactionId() {
            return this.transactionId;
        }
    }

    public static class TransactionRejectedRuntimeException
    extends RuntimeException {
        public TransactionRejectedRuntimeException(TransactionRejectedException wrap) {
            super(wrap.getMessage(), wrap);
        }
    }

    public static class TransactionRejectedException
    extends Exception {
        private final String transactionId;

        protected TransactionRejectedException(String transactionId) {
            super("Transaction " + transactionId + " rejected. Please restart the transaction.");
            this.transactionId = transactionId;
        }

        @Generated
        public String getTransactionId() {
            return this.transactionId;
        }
    }

    public static class TransactionPreconditionFailedException
    extends RuntimeException {
        public TransactionPreconditionFailedException(String message) {
            super(message);
        }
    }

    public static abstract class TransactionValidator
    implements TransactionAware,
    DirectElementWiseTransform {
        transient Transaction transaction;

        @Override
        public final Transaction currentTransaction() {
            return Objects.requireNonNull(this.transaction);
        }

        @Override
        public void setTransaction(Transaction transaction) {
            this.transaction = transaction;
        }

        @Override
        public final void transform(StreamElement input, CommitCallback commit) throws TransactionRejectedRuntimeException {
            try {
                this.validate(input, this.currentTransaction());
            }
            catch (TransactionRejectedException ex) {
                throw new TransactionRejectedRuntimeException(ex);
            }
        }

        public abstract void validate(StreamElement var1, Transaction var2) throws TransactionPreconditionFailedException, TransactionRejectedException;
    }

    public static interface TransactionAware {
        public Transaction currentTransaction();

        public void setTransaction(Transaction var1);
    }
}

