/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.server;

import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.server.IngestServer;
import cz.o2.proxima.storage.StorageFilter;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.CommitLogReader;
import cz.o2.proxima.storage.commitlog.LogObserver;
import cz.o2.proxima.storage.commitlog.RetryableLogObserver;
import cz.o2.proxima.transform.Transformation;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransformationObserver
extends RetryableLogObserver {
    private static final Logger log = LoggerFactory.getLogger(TransformationObserver.class);
    private final Repository repo;
    private final Transformation transformation;
    private final StorageFilter filter;
    private final String name;

    TransformationObserver(int retries, String consumer, CommitLogReader reader, Repository repo, String name, Transformation transformation, StorageFilter filter) {
        super(retries, consumer, reader);
        this.repo = repo;
        this.name = name;
        this.transformation = transformation;
        this.filter = filter;
    }

    protected void failure() {
        IngestServer.die(String.format("Failed to transform using %s. Bailing out.", this.transformation));
    }

    public boolean onNextInternal(StreamElement ingest, LogObserver.OffsetCommitter committer) {
        if (!this.filter.apply(ingest)) {
            log.debug("Transformation {}: skipping transformation of {} by filter", (Object)this.name, (Object)ingest);
            committer.confirm();
        } else {
            this.doTransform(committer, ingest);
        }
        return true;
    }

    private void doTransform(LogObserver.OffsetCommitter committer, StreamElement ingest) {
        AtomicInteger toConfirm = new AtomicInteger(0);
        try {
            Transformation.Collector & Serializable collector = (Transformation.Collector & Serializable)elem -> {
                try {
                    log.debug("Transformation {}: writing transformed element {}", (Object)this.name, elem);
                    IngestServer.ingestRequest(this.repo, elem, elem.getUuid(), rpc -> {
                        if (rpc.getStatus() == 200) {
                            if (toConfirm.decrementAndGet() == 0) {
                                committer.confirm();
                            }
                        } else {
                            toConfirm.set(-1);
                            committer.fail((Throwable)new RuntimeException(String.format("Received invalid status %d:%s", rpc.getStatus(), rpc.getStatusMessage())));
                        }
                    });
                }
                catch (Exception ex) {
                    toConfirm.set(-1);
                    committer.fail((Throwable)ex);
                }
            };
            if (toConfirm.addAndGet(this.transformation.apply(ingest, (Transformation.Collector)collector)) == 0) {
                committer.confirm();
            }
        }
        catch (Exception ex) {
            toConfirm.set(-1);
            committer.fail((Throwable)ex);
        }
    }
}

