/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.flink.core;

import cz.o2.proxima.core.storage.Partition;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.direct.core.LogObserver;
import cz.o2.proxima.flink.core.ResultExtractor;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nullable;
import lombok.Generated;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;

abstract class AbstractSourceLogObserver<OffsetT extends Serializable, ContextT extends LogObserver.OnNextContext<OffsetT>, OutputT>
implements LogObserver<OffsetT, ContextT> {
    private final CountDownLatch completed = new CountDownLatch(1);
    private final Set<Partition> seenPartitions = new HashSet<Partition>();
    private final SourceFunction.SourceContext<OutputT> sourceContext;
    private final ResultExtractor<OutputT> resultExtractor;
    private final Set<Partition> skipFirstElementFromEachPartition;
    private long watermark = Long.MIN_VALUE;
    @Nullable
    private volatile Throwable error = null;

    AbstractSourceLogObserver(SourceFunction.SourceContext<OutputT> sourceContext, ResultExtractor<OutputT> resultExtractor, Set<Partition> skipFirstElementFromEachPartition) {
        this.sourceContext = sourceContext;
        this.resultExtractor = resultExtractor;
        this.skipFirstElementFromEachPartition = skipFirstElementFromEachPartition;
    }

    abstract void markOffsetAsConsumed(ContextT var1);

    public boolean onError(Throwable error) {
        this.error = error;
        this.completed.countDown();
        return false;
    }

    public void onCompleted() {
        this.completed.countDown();
    }

    public void onCancelled() {
        this.completed.countDown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean onNext(StreamElement ingest, ContextT context) {
        boolean skipElement;
        boolean bl = skipElement = this.skipFirstElementFromEachPartition.contains(context.getPartition()) && this.seenPartitions.add(context.getPartition());
        if (!skipElement) {
            Object object = this.sourceContext.getCheckpointLock();
            synchronized (object) {
                this.sourceContext.collectWithTimestamp(this.resultExtractor.toResult(ingest), ingest.getStamp());
                this.markOffsetAsConsumed(context);
            }
        }
        this.maybeUpdateWatermark(this.watermark);
        return true;
    }

    public void awaitCompleted() throws InterruptedException {
        this.completed.await();
    }

    public Optional<Throwable> getError() {
        return Optional.ofNullable(this.error);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void maybeUpdateWatermark(long watermark) {
        if (watermark > this.watermark) {
            this.watermark = watermark;
            Object object = this.sourceContext.getCheckpointLock();
            synchronized (object) {
                this.sourceContext.emitWatermark(new Watermark(this.watermark));
            }
        }
    }

    @Generated
    public SourceFunction.SourceContext<OutputT> getSourceContext() {
        return this.sourceContext;
    }
}

