/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.client.stream.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.state.Revision;
import io.pravega.client.state.RevisionedStreamClient;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.TimeWindow;
import io.pravega.client.stream.impl.SegmentWithRange;
import io.pravega.common.concurrent.LatestItemSequentialProcessor;
import io.pravega.shared.watermarks.Watermark;
import java.beans.ConstructorProperties;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;
import lombok.Generated;

public class WatermarkReaderImpl
implements AutoCloseable {
    private final Stream stream;
    private final WatermarkFetcher fetcher;
    private final LatestItemSequentialProcessor<Map<SegmentWithRange, Long>> processor;
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private final ArrayDeque<Watermark> inflight = new ArrayDeque();
    @GuardedBy(value="lock")
    private Long passedTimestamp = null;
    private final Consumer<Map<SegmentWithRange, Long>> processFunction = new Consumer<Map<SegmentWithRange, Long>>(){

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void accept(Map<SegmentWithRange, Long> position) {
            Object object = WatermarkReaderImpl.this.lock;
            synchronized (object) {
                Watermark mark;
                int compare;
                while (!WatermarkReaderImpl.this.inflight.isEmpty() && (compare = WatermarkReaderImpl.compare(WatermarkReaderImpl.this.stream, position, (Watermark)WatermarkReaderImpl.this.inflight.getFirst())) > 0) {
                    WatermarkReaderImpl.this.passedTimestamp = ((Watermark)WatermarkReaderImpl.this.inflight.removeFirst()).getLowerTimeBound();
                }
                while ((WatermarkReaderImpl.this.inflight.isEmpty() || WatermarkReaderImpl.compare(WatermarkReaderImpl.this.stream, position, (Watermark)WatermarkReaderImpl.this.inflight.getLast()) >= 0) && (mark = WatermarkReaderImpl.this.fetcher.fetchNextMark()) != null) {
                    if (WatermarkReaderImpl.compare(WatermarkReaderImpl.this.stream, position, mark) <= 0) {
                        WatermarkReaderImpl.this.inflight.addLast(mark);
                        continue;
                    }
                    WatermarkReaderImpl.this.passedTimestamp = mark.getLowerTimeBound();
                }
            }
        }
    };

    public WatermarkReaderImpl(Stream stream, RevisionedStreamClient<Watermark> markClient, Executor executor) {
        this.stream = Preconditions.checkNotNull(stream);
        this.fetcher = new WatermarkFetcher(markClient);
        this.processor = new LatestItemSequentialProcessor<Map<SegmentWithRange, Long>>(this.processFunction, Preconditions.checkNotNull(executor));
    }

    public void advanceTo(Map<SegmentWithRange, Long> readerGroupPosition) {
        this.processor.updateItem(readerGroupPosition);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TimeWindow getTimeWindow() {
        Object object = this.lock;
        synchronized (object) {
            Long upperBound = this.inflight.isEmpty() ? null : Long.valueOf(this.inflight.getLast().getUpperTimeBound());
            return new TimeWindow(this.passedTimestamp, upperBound);
        }
    }

    @VisibleForTesting
    static int compare(Stream stream, Map<SegmentWithRange, Long> readerGroupPosition, Watermark mark) {
        Map<SegmentWithRange, Long> left = readerGroupPosition;
        HashMap<SegmentWithRange, Long> right = new HashMap<SegmentWithRange, Long>();
        for (Map.Entry<io.pravega.shared.watermarks.SegmentWithRange, Long> entry : mark.getStreamCut().entrySet()) {
            Segment segment = new Segment(stream.getScope(), stream.getStreamName(), entry.getKey().getSegmentId());
            right.put(new SegmentWithRange(segment, entry.getKey().getRangeLow(), entry.getKey().getRangeHigh()), entry.getValue());
        }
        boolean leftBelowRight = false;
        boolean leftAboveRight = false;
        for (Map.Entry<SegmentWithRange, Long> entry : left.entrySet()) {
            List<SegmentWithOffset> matching = WatermarkReaderImpl.findOverlappingSegmentIn(entry.getKey(), right);
            for (SegmentWithOffset match : matching) {
                SegmentWithOffset leftSegment = new SegmentWithOffset(entry.getKey().getSegment(), entry.getValue());
                int comparison = leftSegment.compareTo(match);
                if (comparison > 0) {
                    leftAboveRight = true;
                    continue;
                }
                if (comparison >= 0) continue;
                leftBelowRight = true;
            }
        }
        if (leftBelowRight && leftAboveRight) {
            return 0;
        }
        if (leftBelowRight) {
            return -1;
        }
        if (leftAboveRight) {
            return 1;
        }
        return 1;
    }

    private static List<SegmentWithOffset> findOverlappingSegmentIn(SegmentWithRange segment, Map<SegmentWithRange, Long> ranges) {
        if (ranges.containsKey(segment)) {
            Long offset = ranges.get(segment);
            return Collections.singletonList(new SegmentWithOffset(segment.getSegment(), offset));
        }
        ArrayList<SegmentWithOffset> result = new ArrayList<SegmentWithOffset>();
        for (Map.Entry<SegmentWithRange, Long> entry : ranges.entrySet()) {
            if (segment.getRange() != null && !entry.getKey().getRange().overlapsWith(segment.getRange())) continue;
            result.add(new SegmentWithOffset(entry.getKey().getSegment(), entry.getValue()));
        }
        return result;
    }

    @Override
    public void close() {
        this.fetcher.close();
    }

    private static final class SegmentWithOffset
    implements Comparable<SegmentWithOffset> {
        private final Segment segment;
        private final long offset;

        @Override
        public int compareTo(SegmentWithOffset o) {
            int result = this.segment.compareTo(o.segment);
            if (result != 0) {
                return result;
            }
            if (this.offset <= -1L && o.offset >= 0L) {
                return 1;
            }
            if (this.offset >= 0L && o.offset <= -1L) {
                return -1;
            }
            return Long.compare(this.offset, o.offset);
        }

        @ConstructorProperties(value={"segment", "offset"})
        @SuppressFBWarnings(justification="generated code")
        @Generated
        public SegmentWithOffset(Segment segment, long offset) {
            this.segment = segment;
            this.offset = offset;
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        public Segment getSegment() {
            return this.segment;
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        public long getOffset() {
            return this.offset;
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof SegmentWithOffset)) {
                return false;
            }
            SegmentWithOffset other = (SegmentWithOffset)o;
            Segment this$segment = this.getSegment();
            Segment other$segment = other.getSegment();
            if (this$segment == null ? other$segment != null : !((Object)this$segment).equals(other$segment)) {
                return false;
            }
            return this.getOffset() == other.getOffset();
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Segment $segment = this.getSegment();
            result = result * 59 + ($segment == null ? 43 : ((Object)$segment).hashCode());
            long $offset = this.getOffset();
            result = result * 59 + (int)($offset >>> 32 ^ $offset);
            return result;
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        public String toString() {
            return "WatermarkReaderImpl.SegmentWithOffset(segment=" + this.getSegment() + ", offset=" + this.getOffset() + ")";
        }
    }

    private static class WatermarkFetcher
    implements AutoCloseable {
        @SuppressFBWarnings(justification="generated code")
        @Generated
        private final Object $lock = new Object[0];
        private final RevisionedStreamClient<Watermark> client;
        @GuardedBy(value="$lock")
        private Revision location = null;
        @GuardedBy(value="$lock")
        private Iterator<Map.Entry<Revision, Watermark>> iter = null;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private Watermark fetchNextMark() {
            Object object = this.$lock;
            synchronized (object) {
                if (this.iter != null && this.iter.hasNext()) {
                    Map.Entry<Revision, Watermark> next = this.iter.next();
                    this.location = next.getKey();
                    return next.getValue();
                }
                if (this.location == null) {
                    this.location = this.client.fetchOldestRevision();
                }
                this.iter = this.client.readFrom(this.location);
                if (this.iter.hasNext()) {
                    Map.Entry<Revision, Watermark> next = this.iter.next();
                    this.location = next.getKey();
                    return next.getValue();
                }
                return null;
            }
        }

        @Override
        public void close() {
            this.client.close();
        }

        @ConstructorProperties(value={"client"})
        @SuppressFBWarnings(justification="generated code")
        @Generated
        public WatermarkFetcher(RevisionedStreamClient<Watermark> client) {
            this.client = client;
        }
    }
}

