package io.pravega.client.state.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.segment.impl.ConditionalOutputStream;
import io.pravega.client.segment.impl.EndOfSegmentException;
import io.pravega.client.segment.impl.EventSegmentReader;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentAttribute;
import io.pravega.client.segment.impl.SegmentInfo;
import io.pravega.client.segment.impl.SegmentMetadataClient;
import io.pravega.client.segment.impl.SegmentOutputStream;
import io.pravega.client.segment.impl.SegmentSealedException;
import io.pravega.client.segment.impl.SegmentTruncatedException;
import io.pravega.client.state.Revision;
import io.pravega.client.state.RevisionedStreamClient;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.TruncatedDataException;
import io.pravega.client.stream.impl.PendingEvent;
import io.pravega.common.concurrent.Futures;
import java.beans.ConstructorProperties;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/state/impl/RevisionedStreamClientImpl.class */
public class RevisionedStreamClientImpl<T> implements RevisionedStreamClient<T> {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(RevisionedStreamClientImpl.class);
    private final Segment segment;

    @GuardedBy("lock")
    private final EventSegmentReader in;

    @GuardedBy("lock")
    private final SegmentOutputStream out;

    @GuardedBy("lock")
    private final ConditionalOutputStream conditional;

    @GuardedBy("lock")
    private final SegmentMetadataClient meta;
    private final Serializer<T> serializer;
    private final Object lock = new Object();

    /* loaded from: input_file:io/pravega/client/state/impl/RevisionedStreamClientImpl$StreamIterator.class */
    private class StreamIterator implements Iterator<Map.Entry<Revision, T>> {
        private final AtomicLong offset;
        private final long endOffset;

        StreamIterator(long j, long j2) {
            this.offset = new AtomicLong(j);
            this.endOffset = j2;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.offset.get() < this.endOffset;
        }

        @Override // java.util.Iterator
        public Map.Entry<Revision, T> next() {
            ByteBuffer read;
            RevisionImpl revisionImpl;
            synchronized (RevisionedStreamClientImpl.this.lock) {
                if (!hasNext()) {
                    throw new NoSuchElementException();
                }
                RevisionedStreamClientImpl.log.trace("Iterater reading entry at", Long.valueOf(this.offset.get()));
                RevisionedStreamClientImpl.this.in.setOffset(this.offset.get());
                try {
                    read = RevisionedStreamClientImpl.this.in.read();
                    this.offset.set(RevisionedStreamClientImpl.this.in.getOffset());
                    revisionImpl = new RevisionImpl(RevisionedStreamClientImpl.this.segment, this.offset.get(), 0);
                } catch (EndOfSegmentException e) {
                    throw new IllegalStateException("SegmentInputStream: " + RevisionedStreamClientImpl.this.in + " shrunk from its original length: " + this.endOffset);
                } catch (SegmentTruncatedException e2) {
                    throw new TruncatedDataException(e2);
                }
            }
            return new AbstractMap.SimpleImmutableEntry(revisionImpl, RevisionedStreamClientImpl.this.serializer.deserialize(read));
        }
    }

    @Override // io.pravega.client.state.RevisionedStreamClient
    public Revision writeConditionally(Revision revision, T t) {
        boolean write;
        long offsetInSegment = revision.asImpl().getOffsetInSegment();
        ByteBuffer serialize = this.serializer.serialize(t);
        int remaining = serialize.remaining();
        synchronized (this.lock) {
            try {
                write = this.conditional.write(serialize, offsetInSegment);
            } catch (SegmentSealedException e) {
                throw new CorruptedStateException("Unexpected end of segment ", e);
            }
        }
        if (!write) {
            log.trace("Write failed at offset {}", Long.valueOf(offsetInSegment));
            return null;
        }
        long newOffset = getNewOffset(offsetInSegment, remaining);
        log.trace("Wrote from {} to {}", Long.valueOf(offsetInSegment), Long.valueOf(newOffset));
        return new RevisionImpl(this.segment, newOffset, 0);
    }

    private static final long getNewOffset(long j, int i) {
        return j + i + 8;
    }

    @Override // io.pravega.client.state.RevisionedStreamClient
    public void writeUnconditionally(T t) {
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            PendingEvent withHeader = PendingEvent.withHeader(null, this.serializer.serialize(t), completableFuture);
            log.trace("Unconditionally writing: {}", t);
            synchronized (this.lock) {
                this.out.write(withHeader);
                this.out.flush();
            }
            Futures.getAndHandleExceptions(completableFuture, RuntimeException::new);
        } catch (SegmentSealedException e) {
            throw new CorruptedStateException("Unexpected end of segment ", e);
        }
    }

    @Override // io.pravega.client.state.RevisionedStreamClient
    public Iterator<Map.Entry<Revision, T>> readFrom(Revision revision) {
        StreamIterator streamIterator;
        synchronized (this.lock) {
            long offsetInSegment = revision.asImpl().getOffsetInSegment();
            SegmentInfo segmentInfo = this.meta.getSegmentInfo();
            long writeOffset = segmentInfo.getWriteOffset();
            if (offsetInSegment < segmentInfo.getStartingOffset()) {
                throw new TruncatedDataException("Data at the supplied revision has been truncated.");
            }
            log.trace("Creating iterator from {} until {}", Long.valueOf(offsetInSegment), Long.valueOf(writeOffset));
            streamIterator = new StreamIterator(offsetInSegment, writeOffset);
        }
        return streamIterator;
    }

    @Override // io.pravega.client.state.RevisionedStreamClient
    public Revision fetchLatestRevision() {
        RevisionImpl revisionImpl;
        synchronized (this.lock) {
            revisionImpl = new RevisionImpl(this.segment, this.meta.fetchCurrentSegmentLength(), 0);
        }
        return revisionImpl;
    }

    @Override // io.pravega.client.state.RevisionedStreamClient
    public Revision getMark() {
        RevisionImpl revisionImpl;
        synchronized (this.lock) {
            long fetchProperty = this.meta.fetchProperty(SegmentAttribute.RevisionStreamClientMark);
            revisionImpl = fetchProperty == Long.MIN_VALUE ? null : new RevisionImpl(this.segment, fetchProperty, 0);
        }
        return revisionImpl;
    }

    @Override // io.pravega.client.state.RevisionedStreamClient
    public boolean compareAndSetMark(Revision revision, Revision revision2) {
        boolean compareAndSetAttribute;
        long offsetInSegment = revision == null ? Long.MIN_VALUE : revision.asImpl().getOffsetInSegment();
        long offsetInSegment2 = revision2 == null ? Long.MIN_VALUE : revision2.asImpl().getOffsetInSegment();
        synchronized (this.lock) {
            compareAndSetAttribute = this.meta.compareAndSetAttribute(SegmentAttribute.RevisionStreamClientMark, offsetInSegment, offsetInSegment2);
        }
        return compareAndSetAttribute;
    }

    @Override // io.pravega.client.state.RevisionedStreamClient
    public Revision fetchOldestRevision() {
        return new RevisionImpl(this.segment, this.meta.getSegmentInfo().getStartingOffset(), 0);
    }

    @Override // io.pravega.client.state.RevisionedStreamClient
    public void truncateToRevision(Revision revision) {
        this.meta.truncateSegment(revision.asImpl().getOffsetInSegment());
    }

    @Override // io.pravega.client.state.RevisionedStreamClient, java.lang.AutoCloseable
    public void close() {
        synchronized (this.lock) {
            try {
                this.out.close();
            } catch (SegmentSealedException e) {
                log.warn("Error closing segment writer {}", this.out);
            }
            this.conditional.close();
            this.meta.close();
            this.in.close();
        }
    }

    @SuppressFBWarnings(justification = "generated code")
    @ConstructorProperties({"segment", "in", "out", "conditional", "meta", "serializer"})
    public RevisionedStreamClientImpl(Segment segment, EventSegmentReader eventSegmentReader, SegmentOutputStream segmentOutputStream, ConditionalOutputStream conditionalOutputStream, SegmentMetadataClient segmentMetadataClient, Serializer<T> serializer) {
        this.segment = segment;
        this.in = eventSegmentReader;
        this.out = segmentOutputStream;
        this.conditional = conditionalOutputStream;
        this.meta = segmentMetadataClient;
        this.serializer = serializer;
    }
}
