package io.pravega.client.stream.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.state.Revision;
import io.pravega.client.state.RevisionedStreamClient;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.TimeWindow;
import io.pravega.common.concurrent.LatestItemSequentialProcessor;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shared.watermarks.Watermark;
import java.beans.ConstructorProperties;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;

/* loaded from: input_file:io/pravega/client/stream/impl/WatermarkReaderImpl.class */
public class WatermarkReaderImpl implements AutoCloseable {
    private final Stream stream;
    private final WatermarkFetcher fetcher;
    private final LatestItemSequentialProcessor<Map<SegmentWithRange, Long>> processor;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final ArrayDeque<Watermark> inflight = new ArrayDeque<>();

    @GuardedBy("lock")
    private Long passedTimestamp = null;
    private final Consumer<Map<SegmentWithRange, Long>> processFunction = new Consumer<Map<SegmentWithRange, Long>>() { // from class: io.pravega.client.stream.impl.WatermarkReaderImpl.1
        @Override // java.util.function.Consumer
        public void accept(Map<SegmentWithRange, Long> map) {
            Watermark fetchNextMark;
            synchronized (WatermarkReaderImpl.this.lock) {
                while (!WatermarkReaderImpl.this.inflight.isEmpty() && WatermarkReaderImpl.compare(WatermarkReaderImpl.this.stream, map, (Watermark) WatermarkReaderImpl.this.inflight.getFirst()) > 0) {
                    WatermarkReaderImpl.this.passedTimestamp = Long.valueOf(((Watermark) WatermarkReaderImpl.this.inflight.removeFirst()).getLowerTimeBound());
                }
                while (true) {
                    if ((!WatermarkReaderImpl.this.inflight.isEmpty() && WatermarkReaderImpl.compare(WatermarkReaderImpl.this.stream, map, (Watermark) WatermarkReaderImpl.this.inflight.getLast()) < 0) || (fetchNextMark = WatermarkReaderImpl.this.fetcher.fetchNextMark()) == null) {
                        break;
                    } else if (WatermarkReaderImpl.compare(WatermarkReaderImpl.this.stream, map, fetchNextMark) <= 0) {
                        WatermarkReaderImpl.this.inflight.addLast(fetchNextMark);
                    } else {
                        WatermarkReaderImpl.this.passedTimestamp = Long.valueOf(fetchNextMark.getLowerTimeBound());
                    }
                }
            }
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/client/stream/impl/WatermarkReaderImpl$SegmentWithOffset.class */
    public static final class SegmentWithOffset implements Comparable<SegmentWithOffset> {
        private final Segment segment;
        private final long offset;

        @Override // java.lang.Comparable
        public int compareTo(SegmentWithOffset segmentWithOffset) {
            int compareTo = this.segment.compareTo(segmentWithOffset.segment);
            if (compareTo != 0) {
                return compareTo;
            }
            if (this.offset <= -1 && segmentWithOffset.offset >= 0) {
                return 1;
            }
            if (this.offset < 0 || segmentWithOffset.offset > -1) {
                return Long.compare(this.offset, segmentWithOffset.offset);
            }
            return -1;
        }

        @SuppressFBWarnings(justification = "generated code")
        @ConstructorProperties({"segment", "offset"})
        public SegmentWithOffset(Segment segment, long j) {
            this.segment = segment;
            this.offset = j;
        }

        @SuppressFBWarnings(justification = "generated code")
        public Segment getSegment() {
            return this.segment;
        }

        @SuppressFBWarnings(justification = "generated code")
        public long getOffset() {
            return this.offset;
        }

        @SuppressFBWarnings(justification = "generated code")
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof SegmentWithOffset)) {
                return false;
            }
            SegmentWithOffset segmentWithOffset = (SegmentWithOffset) obj;
            Segment segment = getSegment();
            Segment segment2 = segmentWithOffset.getSegment();
            if (segment == null) {
                if (segment2 != null) {
                    return false;
                }
            } else if (!segment.equals(segment2)) {
                return false;
            }
            return getOffset() == segmentWithOffset.getOffset();
        }

        @SuppressFBWarnings(justification = "generated code")
        public int hashCode() {
            Segment segment = getSegment();
            int hashCode = (1 * 59) + (segment == null ? 43 : segment.hashCode());
            long offset = getOffset();
            return (hashCode * 59) + ((int) ((offset >>> 32) ^ offset));
        }

        @SuppressFBWarnings(justification = "generated code")
        public String toString() {
            return "WatermarkReaderImpl.SegmentWithOffset(segment=" + getSegment() + ", offset=" + getOffset() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/client/stream/impl/WatermarkReaderImpl$WatermarkFetcher.class */
    public static class WatermarkFetcher implements AutoCloseable {
        private final RevisionedStreamClient<Watermark> client;

        @SuppressFBWarnings(justification = "generated code")
        private final Object $lock = new Object[0];

        @GuardedBy("$lock")
        private Revision location = null;

        @GuardedBy("$lock")
        private Iterator<Map.Entry<Revision, Watermark>> iter = null;

        /* JADX INFO: Access modifiers changed from: private */
        public Watermark fetchNextMark() {
            synchronized (this.$lock) {
                if (this.iter != null && this.iter.hasNext()) {
                    Map.Entry<Revision, Watermark> next = this.iter.next();
                    this.location = next.getKey();
                    return next.getValue();
                }
                if (this.location == null) {
                    this.location = this.client.fetchOldestRevision();
                }
                this.iter = this.client.readFrom(this.location);
                if (!this.iter.hasNext()) {
                    return null;
                }
                Map.Entry<Revision, Watermark> next2 = this.iter.next();
                this.location = next2.getKey();
                return next2.getValue();
            }
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.client.close();
        }

        @SuppressFBWarnings(justification = "generated code")
        @ConstructorProperties({"client"})
        public WatermarkFetcher(RevisionedStreamClient<Watermark> revisionedStreamClient) {
            this.client = revisionedStreamClient;
        }
    }

    public WatermarkReaderImpl(Stream stream, RevisionedStreamClient<Watermark> revisionedStreamClient, Executor executor) {
        this.stream = (Stream) Preconditions.checkNotNull(stream);
        this.fetcher = new WatermarkFetcher(revisionedStreamClient);
        this.processor = new LatestItemSequentialProcessor<>(this.processFunction, (Executor) Preconditions.checkNotNull(executor));
    }

    public void advanceTo(Map<SegmentWithRange, Long> map) {
        this.processor.updateItem(map);
    }

    public TimeWindow getTimeWindow() {
        TimeWindow timeWindow;
        synchronized (this.lock) {
            timeWindow = new TimeWindow(this.passedTimestamp, this.inflight.isEmpty() ? null : Long.valueOf(this.inflight.getLast().getUpperTimeBound()));
        }
        return timeWindow;
    }

    @VisibleForTesting
    static int compare(Stream stream, Map<SegmentWithRange, Long> map, Watermark watermark) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<io.pravega.shared.watermarks.SegmentWithRange, Long> entry : watermark.getStreamCut().entrySet()) {
            hashMap.put(new SegmentWithRange(new Segment(stream.getScope(), stream.getStreamName(), entry.getKey().getSegmentId()), entry.getKey().getRangeLow(), entry.getKey().getRangeHigh()), entry.getValue());
        }
        boolean z = false;
        boolean z2 = false;
        for (Map.Entry<SegmentWithRange, Long> entry2 : map.entrySet()) {
            Iterator<SegmentWithOffset> it = findOverlappingSegmentIn(entry2.getKey(), hashMap).iterator();
            while (it.hasNext()) {
                int compareTo = new SegmentWithOffset(entry2.getKey().getSegment(), entry2.getValue().longValue()).compareTo(it.next());
                if (compareTo > 0) {
                    z2 = true;
                } else if (compareTo < 0) {
                    z = true;
                }
            }
        }
        if (z && z2) {
            return 0;
        }
        if (z) {
            return -1;
        }
        return z2 ? 1 : 1;
    }

    private static List<SegmentWithOffset> findOverlappingSegmentIn(SegmentWithRange segmentWithRange, Map<SegmentWithRange, Long> map) {
        if (map.containsKey(segmentWithRange)) {
            return Collections.singletonList(new SegmentWithOffset(segmentWithRange.getSegment(), map.get(segmentWithRange).longValue()));
        }
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<SegmentWithRange, Long> entry : map.entrySet()) {
            if (segmentWithRange.getRange() == null || entry.getKey().getRange().overlapsWith(segmentWithRange.getRange())) {
                arrayList.add(new SegmentWithOffset(entry.getKey().getSegment(), entry.getValue().longValue()));
            }
        }
        return arrayList;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.fetcher.close();
    }
}
