/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.flink.core.batch;

import cz.o2.proxima.core.repository.AttributeDescriptor;
import cz.o2.proxima.core.storage.Partition;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.direct.core.batch.BatchLogObserver;
import cz.o2.proxima.direct.core.batch.BatchLogReader;
import cz.o2.proxima.direct.core.batch.ObserveHandle;
import cz.o2.proxima.direct.core.batch.Offset;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.Generated;

public class OffsetTrackingBatchLogReader
implements BatchLogReader {
    private final BatchLogReader delegate;

    public static OffsetTrackingBatchLogReader of(BatchLogReader reader) {
        return new OffsetTrackingBatchLogReader(reader);
    }

    private static Offset mergeOffsets(Offset oldValue, Offset newValue) {
        if (oldValue.getElementIndex() < newValue.getElementIndex()) {
            return newValue;
        }
        throw new IllegalStateException(String.format("Offsets are not monotonically increasing. Old value: %s. New value: %s.", oldValue, newValue));
    }

    public OffsetTrackingBatchLogReader(BatchLogReader delegate) {
        this.delegate = delegate;
    }

    public List<Partition> getPartitions(long startStamp, long endStamp) {
        return this.delegate.getPartitions(startStamp, endStamp);
    }

    public ObserveHandle observe(final List<Partition> partitions, List<AttributeDescriptor<?>> attributes, BatchLogObserver observer) {
        final OffsetTrackingBatchLogObserver wrappedObserver = new OffsetTrackingBatchLogObserver(observer);
        final ObserveHandle handle = this.delegate.observe(partitions, attributes, (BatchLogObserver)wrappedObserver);
        return new OffsetTrackingObserveHandle(){

            @Override
            public List<Offset> getCurrentOffsets() {
                HashMap result = new HashMap();
                partitions.forEach(p -> result.put(p, Offset.of((Partition)p, (long)-1L, (boolean)false)));
                wrappedObserver.getConsumedOffsets().forEach((p, o) -> result.merge(p, o, (x$0, x$1) -> OffsetTrackingBatchLogReader.mergeOffsets(x$0, x$1)));
                return new ArrayList<Offset>(result.values());
            }

            public void close() {
                handle.close();
            }
        };
    }

    public BatchLogReader.Factory<?> asFactory() {
        BatchLogReader.Factory delegateFactory = this.delegate.asFactory();
        return (BatchLogReader.Factory & Serializable)repository -> new OffsetTrackingBatchLogReader((BatchLogReader)delegateFactory.apply(repository));
    }

    private static abstract class OffsetTrackingOnNextContext
    implements BatchLogObserver.OnNextContext,
    OffsetCommitter {
        private final BatchLogObserver.OnNextContext delegate;

        OffsetTrackingOnNextContext(BatchLogObserver.OnNextContext delegate) {
            this.delegate = delegate;
        }

        @Generated
        public long getWatermark() {
            return this.delegate.getWatermark();
        }

        @Generated
        public Partition getPartition() {
            return this.delegate.getPartition();
        }

        @Generated
        public Offset getOffset() {
            return (Offset)this.delegate.getOffset();
        }

        @Generated
        public long getCurrentTime() {
            return this.delegate.getCurrentTime();
        }
    }

    public static class OffsetTrackingBatchLogObserver
    implements BatchLogObserver {
        private final Map<Partition, Offset> consumedOffsets = new HashMap<Partition, Offset>();
        private final BatchLogObserver delegate;

        public OffsetTrackingBatchLogObserver(BatchLogObserver delegate) {
            this.delegate = delegate;
        }

        public boolean onNext(StreamElement element, final BatchLogObserver.OnNextContext context) {
            return this.delegate.onNext(element, (BatchLogObserver.OnNextContext)new OffsetTrackingOnNextContext(context){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void markOffsetAsConsumed() {
                    Map<Partition, Offset> map = consumedOffsets;
                    synchronized (map) {
                        consumedOffsets.merge(context.getPartition(), (Offset)context.getOffset(), (x$0, x$1) -> OffsetTrackingBatchLogReader.mergeOffsets(x$0, x$1));
                    }
                }
            });
        }

        public void onCompleted() {
            this.delegate.onCompleted();
        }

        public void onCancelled() {
            this.delegate.onCancelled();
        }

        public boolean onError(Throwable error) {
            return this.delegate.onError(error);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private Map<Partition, Offset> getConsumedOffsets() {
            Map<Partition, Offset> map = this.consumedOffsets;
            synchronized (map) {
                return new HashMap<Partition, Offset>(this.consumedOffsets);
            }
        }
    }

    public static interface OffsetCommitter {
        public void markOffsetAsConsumed();
    }

    public static interface OffsetTrackingObserveHandle
    extends ObserveHandle {
        public List<Offset> getCurrentOffsets();
    }
}

