package io.rsocket.keepalive;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.frame.FrameHeaderFlyweight;
import io.rsocket.frame.FrameType;
import io.rsocket.internal.KeepAliveData;
import io.rsocket.resume.ResumePositionsConnection;
import io.rsocket.resume.ResumeStateHolder;
import io.rsocket.util.DuplexConnectionProxy;
import io.rsocket.util.Function3;
import java.time.Duration;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:io/rsocket/keepalive/KeepAliveConnection.class */
public class KeepAliveConnection extends DuplexConnectionProxy implements ResumePositionsConnection {
    private final MonoProcessor<KeepAliveHandler> keepAliveHandlerReady;
    private final ByteBufAllocator allocator;
    private final Function<ByteBuf, KeepAliveData> keepAliveData;
    private final Function3<ByteBufAllocator, Duration, Duration, KeepAliveHandler> keepAliveHandlerFactory;
    private final Consumer<Throwable> errorConsumer;
    private volatile KeepAliveHandler keepAliveHandler;
    private volatile ResumeStateHolder resumeStateHolder;
    private volatile boolean keepAliveHandlerStarted;

    public static KeepAliveConnection ofClient(ByteBufAllocator byteBufAllocator, DuplexConnection duplexConnection, Function<ByteBuf, KeepAliveData> function, Consumer<Throwable> consumer) {
        return new KeepAliveConnection(byteBufAllocator, duplexConnection, function, KeepAliveHandler::ofClient, consumer);
    }

    public static KeepAliveConnection ofServer(ByteBufAllocator byteBufAllocator, DuplexConnection duplexConnection, Function<ByteBuf, KeepAliveData> function, Consumer<Throwable> consumer) {
        return new KeepAliveConnection(byteBufAllocator, duplexConnection, function, KeepAliveHandler::ofServer, consumer);
    }

    private KeepAliveConnection(ByteBufAllocator byteBufAllocator, DuplexConnection duplexConnection, Function<ByteBuf, KeepAliveData> function, Function3<ByteBufAllocator, Duration, Duration, KeepAliveHandler> function3, Consumer<Throwable> consumer) {
        super(duplexConnection);
        this.keepAliveHandlerReady = MonoProcessor.create();
        this.allocator = byteBufAllocator;
        this.keepAliveData = function;
        this.keepAliveHandlerFactory = function3;
        this.errorConsumer = consumer;
        this.keepAliveHandlerReady.subscribe(this::startKeepAlives);
    }

    private void startKeepAlives(KeepAliveHandler keepAliveHandler) {
        this.keepAliveHandler = keepAliveHandler;
        send(keepAliveHandler.send()).subscribe((Consumer) null, th -> {
            keepAliveHandler.dispose();
        });
        keepAliveHandler.timeout().subscribe(keepAlive -> {
            this.errorConsumer.accept(new ConnectionErrorException(String.format("No keep-alive acks for %d ms", Long.valueOf(keepAlive.getTimeoutMillis()))));
            dispose();
        });
        keepAliveHandler.start();
    }

    @Override // io.rsocket.util.DuplexConnectionProxy, io.rsocket.DuplexConnection
    public Mono<Void> send(Publisher<ByteBuf> publisher) {
        return super.send(Flux.from(publisher).doOnNext(this::startKeepAliveHandlerOnce));
    }

    @Override // io.rsocket.util.DuplexConnectionProxy, io.rsocket.DuplexConnection
    public Flux<ByteBuf> receive() {
        return super.receive().doOnNext(byteBuf -> {
            ResumeStateHolder resumeStateHolder;
            if (!isKeepAliveFrame(byteBuf)) {
                startKeepAliveHandlerOnce(byteBuf);
                return;
            }
            long receive = this.keepAliveHandler.receive(byteBuf);
            if (receive <= 0 || (resumeStateHolder = this.resumeStateHolder) == null) {
                return;
            }
            resumeStateHolder.onImpliedPosition(receive);
        });
    }

    @Override // io.rsocket.util.DuplexConnectionProxy, io.rsocket.Closeable
    public Mono<Void> onClose() {
        return super.onClose().doFinally(signalType -> {
            KeepAliveHandler keepAliveHandler = (KeepAliveHandler) this.keepAliveHandlerReady.peek();
            if (keepAliveHandler != null) {
                keepAliveHandler.dispose();
            }
        });
    }

    @Override // io.rsocket.resume.ResumePositionsConnection
    public void acceptResumeState(ResumeStateHolder resumeStateHolder) {
        this.resumeStateHolder = resumeStateHolder;
        this.keepAliveHandlerReady.subscribe(keepAliveHandler -> {
            keepAliveHandler.resumeState(resumeStateHolder);
        });
    }

    private void startKeepAliveHandlerOnce(ByteBuf byteBuf) {
        if (this.keepAliveHandlerStarted || !isStartFrame(byteBuf)) {
            return;
        }
        this.keepAliveHandlerStarted = true;
        startKeepAliveHandler(this.keepAliveData.apply(byteBuf));
    }

    private static boolean isStartFrame(ByteBuf byteBuf) {
        FrameType frameType = FrameHeaderFlyweight.frameType(byteBuf);
        return frameType == FrameType.SETUP || frameType == FrameType.RESUME;
    }

    private static boolean isKeepAliveFrame(ByteBuf byteBuf) {
        return FrameHeaderFlyweight.frameType(byteBuf) == FrameType.KEEPALIVE;
    }

    private void startKeepAliveHandler(@Nullable KeepAliveData keepAliveData) {
        if (keepAliveData != null) {
            this.keepAliveHandlerReady.onNext(this.keepAliveHandlerFactory.apply(this.allocator, keepAliveData.getTickPeriod(), keepAliveData.getTimeout()));
        }
    }
}
