/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.transform;

import cz.o2.proxima.direct.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.commitlog.CommitLogObservers;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.transaction.TransactionalOnlineAttributeWriter;
import cz.o2.proxima.direct.transform.DirectElementWiseTransform;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.storage.StorageFilter;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.transform.ElementWiseTransformation;
import cz.o2.proxima.util.Optionals;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class TransformationObserver
implements CommitLogObserver {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TransformationObserver.class);
    final String name;
    final RepositoryFactory repoFactory;
    transient DirectDataOperator direct;
    final boolean supportTransactions;
    final StorageFilter filter;

    TransformationObserver(DirectDataOperator direct, String name, boolean supportTransactions, StorageFilter filter) {
        this.name = name;
        this.repoFactory = direct.getRepository().asFactory();
        this.supportTransactions = supportTransactions;
        this.filter = filter;
    }

    @Override
    public boolean onError(Throwable error) {
        return true;
    }

    public CommitLogObservers.TerminationStrategy onFatalError(Throwable error) {
        String msg = String.format("Failed to transform using %s. Bailing out.", this.name);
        log.error(msg, error);
        this.die(msg);
        return CommitLogObservers.TerminationStrategy.RETHROW;
    }

    @Override
    public void onIdle(CommitLogObserver.OnIdleContext context) {
        this.reportConsumerWatermark(this.name, context.getWatermark(), -1L);
    }

    @Override
    public boolean onNext(StreamElement ingest, CommitLogObserver.OnNextContext context) {
        log.debug("Transformation {}: Received ingest {} at watermark {}", new Object[]{this.name, ingest, context.getWatermark()});
        this.reportConsumerWatermark(this.name, context.getWatermark(), ingest.getStamp());
        if (!this.filter.apply(ingest)) {
            log.debug("Transformation {}: skipping transformation of {} by filter", (Object)this.name, (Object)ingest);
            context.confirm();
        } else {
            this.doTransform(ingest, context);
        }
        return true;
    }

    abstract void doTransform(StreamElement var1, CommitLogObserver.OffsetCommitter var2);

    DirectDataOperator direct() {
        if (this.direct == null) {
            this.direct = (DirectDataOperator)this.repoFactory.apply().getOrCreateOperator(DirectDataOperator.class, new Consumer[0]);
        }
        return this.direct;
    }

    protected void reportConsumerWatermark(String name, long watermark, long elementStamp) {
    }

    protected void die(String msg) {
        throw new RuntimeException(msg);
    }

    protected void onReplicated(StreamElement element) {
    }

    public static class Contextual
    extends TransformationObserver {
        private final DirectElementWiseTransform transformation;

        public Contextual(DirectDataOperator direct, String name, DirectElementWiseTransform transformation, boolean supportTransactions, StorageFilter filter) {
            super(direct, name, supportTransactions, filter);
            this.transformation = transformation;
        }

        @Override
        void doTransform(StreamElement ingest, CommitLogObserver.OffsetCommitter context) {
            log.debug("Transformation {}: processing input {}", (Object)this.name, (Object)ingest);
            int i = 0;
            while (true) {
                try {
                    this.transformation.transform(ingest, context::commit);
                }
                catch (TransactionalOnlineAttributeWriter.TransactionRejectedRuntimeException ex) {
                    log.info("Caught {}. Retries so far {}. {}", new Object[]{ex.getClass().getSimpleName(), i, i < 2 ? "Retrying." : "Rethrowing."});
                    if (i == 2) {
                        throw ex;
                    }
                    ++i;
                    continue;
                }
                break;
            }
        }
    }

    public static class NonContextual
    extends TransformationObserver {
        private final ElementWiseTransformation transformation;

        public NonContextual(DirectDataOperator direct, String name, ElementWiseTransformation transformation, boolean supportTransactions, StorageFilter filter) {
            super(direct, name, supportTransactions, filter);
            this.transformation = transformation;
        }

        @Override
        void doTransform(StreamElement ingest, CommitLogObserver.OffsetCommitter committer) {
            AtomicInteger toConfirm = new AtomicInteger(0);
            try {
                ElementWiseTransformation.Collector & Serializable collector = (ElementWiseTransformation.Collector & Serializable)elem -> {
                    try {
                        log.debug("Transformation {}: writing transformed element {}", (Object)this.name, elem);
                        ((OnlineAttributeWriter)Optionals.get(this.direct().getWriter(elem.getAttributeDescriptor()))).write((StreamElement)elem, (succ, exc) -> {
                            if (succ) {
                                this.onReplicated((StreamElement)elem);
                                if (toConfirm.decrementAndGet() == 0) {
                                    committer.confirm();
                                }
                            } else {
                                toConfirm.set(-1);
                                committer.fail(exc);
                            }
                        });
                    }
                    catch (Exception ex) {
                        toConfirm.set(-1);
                        committer.fail(ex);
                    }
                };
                if (toConfirm.addAndGet(this.transformation.apply(ingest, (ElementWiseTransformation.Collector)collector)) == 0) {
                    committer.confirm();
                }
            }
            catch (Exception ex) {
                toConfirm.set(-1);
                committer.fail(ex);
            }
        }
    }
}

