package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Operators;

/* loaded from: input_file:io/rsocket/resume/InMemoryResumableFramesStore.class */
public class InMemoryResumableFramesStore extends Flux<ByteBuf> implements CoreSubscriber<ByteBuf>, ResumableFramesStore, Subscription {
    final MonoProcessor<Void> disposed = MonoProcessor.create();
    final ArrayList<ByteBuf> cachedFrames = new ArrayList<>();
    final String tag;
    final int cacheLimit;
    volatile long impliedPosition;
    volatile long position;
    volatile int cacheSize;
    CoreSubscriber<? super Void> saveFramesSubscriber;
    CoreSubscriber<? super ByteBuf> actual;
    volatile int state;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) InMemoryResumableFramesStore.class);
    static final AtomicLongFieldUpdater<InMemoryResumableFramesStore> IMPLIED_POSITION = AtomicLongFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "impliedPosition");
    static final AtomicLongFieldUpdater<InMemoryResumableFramesStore> POSITION = AtomicLongFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "position");
    static final AtomicIntegerFieldUpdater<InMemoryResumableFramesStore> CACHE_SIZE = AtomicIntegerFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "cacheSize");
    static final AtomicIntegerFieldUpdater<InMemoryResumableFramesStore> STATE = AtomicIntegerFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "state");

    public InMemoryResumableFramesStore(String str, int i) {
        this.tag = str;
        this.cacheLimit = i;
    }

    @Override // io.rsocket.resume.ResumableFramesStore
    public Mono<Void> saveFrames(Flux<ByteBuf> flux) {
        return flux.transform(Operators.lift((scannable, coreSubscriber) -> {
            this.saveFramesSubscriber = coreSubscriber;
            return this;
        })).then();
    }

    @Override // io.rsocket.resume.ResumableFramesStore
    public void releaseFrames(long j) {
        long j2 = this.position;
        logger.debug("{} Removing frames for local: {}, remote implied: {}", this.tag, Long.valueOf(j2), Long.valueOf(j));
        long max = Math.max(0L, j - j2);
        int i = 0;
        ArrayList<ByteBuf> arrayList = this.cachedFrames;
        synchronized (this) {
            while (max > i && arrayList.size() > 0) {
                ByteBuf remove = arrayList.remove(0);
                int readableBytes = remove.readableBytes();
                remove.release();
                i += readableBytes;
            }
        }
        if (max > i) {
            throw new IllegalStateException(String.format("Local and remote state disagreement: need to remove additional %d bytes, but cache is empty", Long.valueOf(max)));
        }
        if (max < i) {
            throw new IllegalStateException("Local and remote state disagreement: local and remote frame sizes are not equal");
        }
        POSITION.addAndGet(this, i);
        if (this.cacheLimit != Integer.MAX_VALUE) {
            CACHE_SIZE.addAndGet(this, -i);
            logger.debug("{} Removed frames. Current cache size: {}", this.tag, Integer.valueOf(this.cacheSize));
        }
    }

    @Override // io.rsocket.resume.ResumableFramesStore
    public Flux<ByteBuf> resumeStream() {
        return this;
    }

    @Override // io.rsocket.resume.ResumableFramesStore
    public long framePosition() {
        return this.position;
    }

    @Override // io.rsocket.resume.ResumableFramesStore
    public long frameImpliedPosition() {
        return this.impliedPosition & Long.MAX_VALUE;
    }

    @Override // io.rsocket.resume.ResumableFramesStore
    public boolean resumableFrameReceived(ByteBuf byteBuf) {
        long j;
        int readableBytes = byteBuf.readableBytes();
        do {
            j = this.impliedPosition;
            if (j < 0) {
                return false;
            }
        } while (!IMPLIED_POSITION.compareAndSet(this, j, j + readableBytes));
        return true;
    }

    void pauseImplied() {
        long j;
        do {
            j = this.impliedPosition;
        } while (!IMPLIED_POSITION.compareAndSet(this, j, j | Long.MIN_VALUE));
        logger.debug("Tag {}. Paused at position[{}]", this.tag, Long.valueOf(j));
    }

    void resumeImplied() {
        long j;
        long j2;
        do {
            j = this.impliedPosition;
            j2 = j & Long.MAX_VALUE;
        } while (!IMPLIED_POSITION.compareAndSet(this, j, j2));
        logger.debug("Tag {}. Resumed at position[{}]", this.tag, Long.valueOf(j2));
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.disposed;
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        if (STATE.getAndSet(this, 2) != 2) {
            this.cacheSize = 0;
            synchronized (this) {
                logger.debug("Tag {}.Disposing InMemoryFrameStore", this.tag);
                Iterator<ByteBuf> it = this.cachedFrames.iterator();
                while (it.hasNext()) {
                    ByteBuf next = it.next();
                    if (next != null) {
                        next.release();
                    }
                }
                this.cachedFrames.clear();
            }
            this.disposed.onComplete();
        }
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return this.state == 2;
    }

    @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        this.saveFramesSubscriber.onSubscribe(Operators.emptySubscription());
        subscription.request(Long.MAX_VALUE);
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        this.saveFramesSubscriber.onError(th);
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        this.saveFramesSubscriber.onComplete();
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(ByteBuf byteBuf) {
        int i;
        boolean isResumableFrame = ResumableDuplexConnection.isResumableFrame(byteBuf);
        if (isResumableFrame) {
            ArrayList<ByteBuf> arrayList = this.cachedFrames;
            int readableBytes = byteBuf.readableBytes();
            int i2 = this.cacheLimit;
            if (i2 != Integer.MAX_VALUE) {
                long j = i2 - this.cacheSize;
                if (j < readableBytes) {
                    int i3 = 0;
                    synchronized (this) {
                        while (j < readableBytes && arrayList.size() != 0) {
                            ByteBuf remove = arrayList.remove(0);
                            int readableBytes2 = remove.readableBytes();
                            j += readableBytes2;
                            i3 += readableBytes2;
                            remove.release();
                        }
                    }
                    CACHE_SIZE.addAndGet(this, -i3);
                    POSITION.addAndGet(this, i3);
                }
            }
            synchronized (this) {
                i = this.state;
                if (i != 2) {
                    arrayList.add(byteBuf);
                }
            }
            if (i2 != Integer.MAX_VALUE) {
                CACHE_SIZE.addAndGet(this, readableBytes);
            }
        } else {
            i = this.state;
        }
        CoreSubscriber<? super ByteBuf> coreSubscriber = this.actual;
        if (i == 1) {
            coreSubscriber.onNext(byteBuf.retain());
        } else if (!isResumableFrame || i == 2) {
            byteBuf.release();
        }
    }

    @Override // org.reactivestreams.Subscription
    public void request(long j) {
    }

    @Override // org.reactivestreams.Subscription
    public void cancel() {
        pauseImplied();
        this.state = 0;
    }

    @Override // reactor.core.publisher.Flux, reactor.core.CorePublisher
    public void subscribe(CoreSubscriber<? super ByteBuf> coreSubscriber) {
        if (this.state == 2) {
            Operators.complete(coreSubscriber);
            return;
        }
        resumeImplied();
        logger.debug("Tag: {}. Subscribed at Position[{}] and ImpliedPosition[{}]", this.tag, Long.valueOf(this.position), Long.valueOf(this.impliedPosition));
        coreSubscriber.onSubscribe(this);
        synchronized (this) {
            Iterator<ByteBuf> it = this.cachedFrames.iterator();
            while (it.hasNext()) {
                coreSubscriber.onNext(it.next().retain());
            }
        }
        this.actual = coreSubscriber;
        STATE.compareAndSet(this, 0, 1);
    }
}
