package io.rsocket.keepalive;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.rsocket.frame.KeepAliveFrameFlyweight;
import io.rsocket.resume.ResumeStateHolder;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.UnicastProcessor;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/rsocket/keepalive/KeepAliveHandler.class */
public abstract class KeepAliveHandler implements Disposable {
    final ByteBufAllocator allocator;
    private final Duration keepAlivePeriod;
    private final long keepAliveTimeout;
    private volatile ResumeStateHolder resumeStateHolder;
    private final UnicastProcessor<ByteBuf> sent;
    private final MonoProcessor<KeepAlive> timeout;
    private final AtomicReference<Disposable> intervalDisposable;
    private volatile long lastReceivedMillis;

    /* loaded from: input_file:io/rsocket/keepalive/KeepAliveHandler$Client.class */
    private static final class Client extends KeepAliveHandler {
        Client(ByteBufAllocator byteBufAllocator, Duration duration, Duration duration2) {
            super(byteBufAllocator, duration, duration2);
        }

        @Override // io.rsocket.keepalive.KeepAliveHandler
        void onIntervalTick() {
            doCheckTimeout();
            doSend(KeepAliveFrameFlyweight.encode(this.allocator, true, obtainLastReceivedPos(), Unpooled.EMPTY_BUFFER));
        }
    }

    /* loaded from: input_file:io/rsocket/keepalive/KeepAliveHandler$KeepAlive.class */
    public static final class KeepAlive {
        private final long tickPeriod;
        private final long timeoutMillis;

        public KeepAlive(long j, long j2) {
            this.tickPeriod = j;
            this.timeoutMillis = j2;
        }

        public long getTickPeriod() {
            return this.tickPeriod;
        }

        public long getTimeoutMillis() {
            return this.timeoutMillis;
        }
    }

    /* loaded from: input_file:io/rsocket/keepalive/KeepAliveHandler$Server.class */
    private static class Server extends KeepAliveHandler {
        Server(ByteBufAllocator byteBufAllocator, Duration duration, Duration duration2) {
            super(byteBufAllocator, duration, duration2);
        }

        @Override // io.rsocket.keepalive.KeepAliveHandler
        void onIntervalTick() {
            doCheckTimeout();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static KeepAliveHandler ofServer(ByteBufAllocator byteBufAllocator, Duration duration, Duration duration2) {
        return new Server(byteBufAllocator, duration, duration2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static KeepAliveHandler ofClient(ByteBufAllocator byteBufAllocator, Duration duration, Duration duration2) {
        return new Client(byteBufAllocator, duration, duration2);
    }

    private KeepAliveHandler(ByteBufAllocator byteBufAllocator, Duration duration, Duration duration2) {
        this.sent = UnicastProcessor.create();
        this.timeout = MonoProcessor.create();
        this.intervalDisposable = new AtomicReference<>();
        this.allocator = byteBufAllocator;
        this.keepAlivePeriod = duration;
        this.keepAliveTimeout = duration2.toMillis();
    }

    public void start() {
        this.lastReceivedMillis = System.currentTimeMillis();
        this.intervalDisposable.compareAndSet(null, Flux.interval(this.keepAlivePeriod).subscribe(l -> {
            onIntervalTick();
        }));
    }

    public void dispose() {
        Disposable andSet = this.intervalDisposable.getAndSet(Disposables.disposed());
        if (andSet != null) {
            andSet.dispose();
        }
        this.sent.onComplete();
        this.timeout.onComplete();
    }

    public long receive(ByteBuf byteBuf) {
        this.lastReceivedMillis = System.currentTimeMillis();
        long lastPosition = KeepAliveFrameFlyweight.lastPosition(byteBuf);
        if (KeepAliveFrameFlyweight.respondFlag(byteBuf)) {
            doSend(KeepAliveFrameFlyweight.encode(this.allocator, false, obtainLastReceivedPos(), KeepAliveFrameFlyweight.data(byteBuf).retain()));
        }
        return lastPosition;
    }

    public void resumeState(ResumeStateHolder resumeStateHolder) {
        this.resumeStateHolder = resumeStateHolder;
    }

    public Flux<ByteBuf> send() {
        return this.sent;
    }

    public Mono<KeepAlive> timeout() {
        return this.timeout;
    }

    abstract void onIntervalTick();

    void doSend(ByteBuf byteBuf) {
        this.sent.onNext(byteBuf);
    }

    void doCheckTimeout() {
        if (System.currentTimeMillis() - this.lastReceivedMillis >= this.keepAliveTimeout) {
            this.timeout.onNext(new KeepAlive(this.keepAlivePeriod.toMillis(), this.keepAliveTimeout));
        }
    }

    long obtainLastReceivedPos() {
        if (this.resumeStateHolder != null) {
            return this.resumeStateHolder.impliedPosition();
        }
        return 0L;
    }
}
