package cz.o2.proxima.direct.transform;

import cz.o2.proxima.direct.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.commitlog.CommitLogObservers;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.storage.StorageFilter;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.transform.ElementWiseTransformation;
import cz.o2.proxima.util.Optionals;
import java.lang.invoke.SerializedLambda;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/direct/transform/TransformationObserver.class */
public abstract class TransformationObserver implements CommitLogObserver {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(TransformationObserver.class);
    final String name;
    final RepositoryFactory repoFactory;
    transient DirectDataOperator direct;
    final boolean supportTransactions;
    final StorageFilter filter;

    /* loaded from: input_file:cz/o2/proxima/direct/transform/TransformationObserver$Contextual.class */
    public static class Contextual extends TransformationObserver {
        private final DirectElementWiseTransform transformation;

        public Contextual(DirectDataOperator directDataOperator, String str, DirectElementWiseTransform directElementWiseTransform, boolean z, StorageFilter storageFilter) {
            super(directDataOperator, str, z, storageFilter);
            this.transformation = directElementWiseTransform;
        }

        @Override // cz.o2.proxima.direct.transform.TransformationObserver
        void doTransform(StreamElement streamElement, CommitLogObserver.OffsetCommitter offsetCommitter) {
            TransformationObserver.log.debug("Transformation {}: processing input {}", this.name, streamElement);
            DirectElementWiseTransform directElementWiseTransform = this.transformation;
            Objects.requireNonNull(offsetCommitter);
            directElementWiseTransform.transform(streamElement, offsetCommitter::commit);
        }

        @Override // cz.o2.proxima.direct.transform.TransformationObserver, cz.o2.proxima.direct.LogObserver
        public /* bridge */ /* synthetic */ boolean onNext(StreamElement streamElement, CommitLogObserver.OnNextContext onNextContext) {
            return super.onNext(streamElement, onNextContext);
        }
    }

    /* loaded from: input_file:cz/o2/proxima/direct/transform/TransformationObserver$NonContextual.class */
    public static class NonContextual extends TransformationObserver {
        private final ElementWiseTransformation transformation;

        public NonContextual(DirectDataOperator directDataOperator, String str, ElementWiseTransformation elementWiseTransformation, boolean z, StorageFilter storageFilter) {
            super(directDataOperator, str, z, storageFilter);
            this.transformation = elementWiseTransformation;
        }

        @Override // cz.o2.proxima.direct.transform.TransformationObserver
        void doTransform(StreamElement streamElement, CommitLogObserver.OffsetCommitter offsetCommitter) {
            AtomicInteger atomicInteger = new AtomicInteger(0);
            try {
                if (atomicInteger.addAndGet(this.transformation.apply(streamElement, streamElement2 -> {
                    try {
                        TransformationObserver.log.debug("Transformation {}: writing transformed element {}", this.name, streamElement2);
                        ((OnlineAttributeWriter) Optionals.get(direct().getWriter(streamElement2.getAttributeDescriptor()))).write(streamElement2, (z, th) -> {
                            if (!z) {
                                atomicInteger.set(-1);
                                offsetCommitter.fail(th);
                            } else {
                                onReplicated(streamElement2);
                                if (atomicInteger.decrementAndGet() == 0) {
                                    offsetCommitter.confirm();
                                }
                            }
                        });
                    } catch (Exception e) {
                        atomicInteger.set(-1);
                        offsetCommitter.fail(e);
                    }
                })) == 0) {
                    offsetCommitter.confirm();
                }
            } catch (Exception e) {
                atomicInteger.set(-1);
                offsetCommitter.fail(e);
            }
        }

        @Override // cz.o2.proxima.direct.transform.TransformationObserver, cz.o2.proxima.direct.LogObserver
        public /* bridge */ /* synthetic */ boolean onNext(StreamElement streamElement, CommitLogObserver.OnNextContext onNextContext) {
            return super.onNext(streamElement, onNextContext);
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 175979356:
                    if (implMethodName.equals("lambda$doTransform$f942e0c6$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/transform/ElementWiseTransformation$Collector") && serializedLambda.getFunctionalInterfaceMethodName().equals("collect") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/transform/TransformationObserver$NonContextual") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lcz/o2/proxima/direct/commitlog/CommitLogObserver$OffsetCommitter;Lcz/o2/proxima/storage/StreamElement;)V")) {
                        NonContextual nonContextual = (NonContextual) serializedLambda.getCapturedArg(0);
                        AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(1);
                        CommitLogObserver.OffsetCommitter offsetCommitter = (CommitLogObserver.OffsetCommitter) serializedLambda.getCapturedArg(2);
                        return streamElement2 -> {
                            try {
                                TransformationObserver.log.debug("Transformation {}: writing transformed element {}", this.name, streamElement2);
                                ((OnlineAttributeWriter) Optionals.get(direct().getWriter(streamElement2.getAttributeDescriptor()))).write(streamElement2, (z2, th) -> {
                                    if (!z2) {
                                        atomicInteger.set(-1);
                                        offsetCommitter.fail(th);
                                    } else {
                                        onReplicated(streamElement2);
                                        if (atomicInteger.decrementAndGet() == 0) {
                                            offsetCommitter.confirm();
                                        }
                                    }
                                });
                            } catch (Exception e) {
                                atomicInteger.set(-1);
                                offsetCommitter.fail(e);
                            }
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    TransformationObserver(DirectDataOperator directDataOperator, String str, boolean z, StorageFilter storageFilter) {
        this.name = str;
        this.repoFactory = directDataOperator.getRepository().asFactory();
        this.supportTransactions = z;
        this.filter = storageFilter;
    }

    @Override // cz.o2.proxima.direct.LogObserver
    public boolean onError(Throwable th) {
        return true;
    }

    public CommitLogObservers.TerminationStrategy onFatalError(Throwable th) {
        String format = String.format("Failed to transform using %s. Bailing out.", this.name);
        log.error(format, th);
        die(format);
        return CommitLogObservers.TerminationStrategy.RETHROW;
    }

    @Override // cz.o2.proxima.direct.commitlog.CommitLogObserver
    public void onIdle(CommitLogObserver.OnIdleContext onIdleContext) {
        reportConsumerWatermark(this.name, onIdleContext.getWatermark(), -1L);
    }

    @Override // cz.o2.proxima.direct.LogObserver
    public boolean onNext(StreamElement streamElement, CommitLogObserver.OnNextContext onNextContext) {
        log.debug("Transformation {}: Received ingest {} at watermark {}", new Object[]{this.name, streamElement, Long.valueOf(onNextContext.getWatermark())});
        reportConsumerWatermark(this.name, onNextContext.getWatermark(), streamElement.getStamp());
        if (this.filter.apply(streamElement)) {
            doTransform(streamElement, onNextContext);
            return true;
        }
        log.debug("Transformation {}: skipping transformation of {} by filter", this.name, streamElement);
        onNextContext.confirm();
        return true;
    }

    abstract void doTransform(StreamElement streamElement, CommitLogObserver.OffsetCommitter offsetCommitter);

    DirectDataOperator direct() {
        if (this.direct == null) {
            this.direct = (DirectDataOperator) this.repoFactory.apply().getOrCreateOperator(DirectDataOperator.class, new Consumer[0]);
        }
        return this.direct;
    }

    protected void reportConsumerWatermark(String str, long j, long j2) {
    }

    protected void die(String str) {
        throw new RuntimeException(str);
    }

    protected void onReplicated(StreamElement streamElement) {
    }
}
