package io.scalecube.gateway.websocket;

import io.scalecube.gateway.GatewayMetrics;
import io.scalecube.gateway.ReferenceCountUtil;
import io.scalecube.gateway.websocket.message.GatewayMessage;
import io.scalecube.gateway.websocket.message.GatewayMessageCodec;
import io.scalecube.gateway.websocket.message.Signal;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.ExceptionProcessor;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

/* loaded from: input_file:io/scalecube/gateway/websocket/GatewayWebsocketAcceptor.class */
public class GatewayWebsocketAcceptor implements BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(GatewayWebsocketAcceptor.class);
    private final ServiceCall serviceCall;
    private final GatewayMetrics metrics;
    private final GatewayMessageCodec messageCodec = new GatewayMessageCodec();

    public GatewayWebsocketAcceptor(ServiceCall serviceCall, GatewayMetrics gatewayMetrics) {
        this.serviceCall = serviceCall;
        this.metrics = gatewayMetrics;
    }

    @Override // java.util.function.BiFunction
    public Publisher<Void> apply(HttpServerRequest httpServerRequest, HttpServerResponse httpServerResponse) {
        return httpServerResponse.sendWebsocket((websocketInbound, websocketOutbound) -> {
            return onConnect(new WebsocketSession(this.messageCodec, httpServerRequest, websocketInbound, websocketOutbound));
        });
    }

    private Mono<Void> onConnect(WebsocketSession websocketSession) {
        LOGGER.info("Session connected: " + websocketSession);
        websocketSession.receive().subscribe(byteBuf -> {
            Mono.fromCallable(() -> {
                return this.messageCodec.decode(byteBuf);
            }).doOnNext(gatewayMessage -> {
                this.metrics.markRequest();
            }).map(this::checkSid).flatMap(gatewayMessage2 -> {
                return handleCancel(websocketSession, gatewayMessage2);
            }).map(obj -> {
                return checkSidNonce(websocketSession, (GatewayMessage) obj);
            }).map(this::checkQualifier).subscribe(gatewayMessage3 -> {
                handleMessage(websocketSession, gatewayMessage3);
            }, th -> {
                LOGGER.error("Exception occurred: {}, session={}", th, websocketSession.id());
                if (th instanceof WebsocketRequestException) {
                    WebsocketRequestException websocketRequestException = (WebsocketRequestException) th;
                    websocketSession.send(websocketRequestException.getCause(), websocketRequestException.releaseRequest().request().streamId()).subscribe();
                }
            });
        });
        return websocketSession.onClose(() -> {
            LOGGER.info("Session disconnected: " + websocketSession);
        });
    }

    private void handleMessage(WebsocketSession websocketSession, GatewayMessage gatewayMessage) {
        Long streamId = gatewayMessage.streamId();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Flux requestMany = this.serviceCall.requestMany(GatewayMessage.toServiceMessage(gatewayMessage));
        if (gatewayMessage.rateLimit() != null) {
            requestMany = requestMany.limitRate(gatewayMessage.rateLimit().intValue());
        }
        websocketSession.register(streamId, requestMany.map(serviceMessage -> {
            return prepareResponse(streamId, serviceMessage, atomicBoolean);
        }).doOnNext(gatewayMessage2 -> {
            this.metrics.markServiceResponse();
        }).doFinally(signalType -> {
            websocketSession.dispose(streamId);
        }).subscribe(gatewayMessage3 -> {
            websocketSession.send(gatewayMessage3).doOnSuccess(r3 -> {
                this.metrics.markResponse();
            }).subscribe();
        }, th -> {
            LOGGER.error("Exception occurred on request: {}, session={}, cause: {}", new Object[]{gatewayMessage, websocketSession.id(), th});
            handleError(websocketSession, streamId, th);
        }, () -> {
            handleCompletion(websocketSession, streamId, atomicBoolean);
        }));
    }

    private void handleError(WebsocketSession websocketSession, Long l, Throwable th) {
        GatewayMessage.Builder from = GatewayMessage.from(ExceptionProcessor.toMessage(th));
        Optional ofNullable = Optional.ofNullable(l);
        from.getClass();
        ofNullable.ifPresent(from::streamId);
        websocketSession.send(from.signal(Signal.ERROR).build()).subscribe();
    }

    private void handleCompletion(WebsocketSession websocketSession, Long l, AtomicBoolean atomicBoolean) {
        if (atomicBoolean.get()) {
            return;
        }
        GatewayMessage.Builder builder = GatewayMessage.builder();
        Optional ofNullable = Optional.ofNullable(l);
        builder.getClass();
        ofNullable.ifPresent(builder::streamId);
        websocketSession.send(builder.signal(Signal.COMPLETE).build()).subscribe();
    }

    private GatewayMessage checkQualifier(GatewayMessage gatewayMessage) {
        if (gatewayMessage.qualifier() == null) {
            throw WebsocketRequestException.newBadRequest("qualifier is missing", gatewayMessage);
        }
        return gatewayMessage;
    }

    private GatewayMessage checkSidNonce(WebsocketSession websocketSession, GatewayMessage gatewayMessage) {
        if (websocketSession.containsSid(gatewayMessage.streamId())) {
            throw WebsocketRequestException.newBadRequest("sid=" + gatewayMessage.streamId() + " is already registered", gatewayMessage);
        }
        return gatewayMessage;
    }

    private Mono<?> handleCancel(WebsocketSession websocketSession, GatewayMessage gatewayMessage) {
        if (!gatewayMessage.hasSignal(Signal.CANCEL)) {
            return Mono.just(gatewayMessage);
        }
        if (!websocketSession.dispose(gatewayMessage.streamId())) {
            throw WebsocketRequestException.newBadRequest("Failed CANCEL request", gatewayMessage);
        }
        Optional.ofNullable(gatewayMessage.data()).ifPresent(ReferenceCountUtil::safestRelease);
        return websocketSession.send(GatewayMessage.builder().streamId(gatewayMessage.streamId()).signal(Signal.CANCEL).build());
    }

    private GatewayMessage checkSid(GatewayMessage gatewayMessage) {
        if (gatewayMessage.streamId() == null) {
            throw WebsocketRequestException.newBadRequest("sid is missing", gatewayMessage);
        }
        return gatewayMessage;
    }

    private GatewayMessage prepareResponse(Long l, ServiceMessage serviceMessage, AtomicBoolean atomicBoolean) {
        GatewayMessage.Builder streamId = GatewayMessage.from(serviceMessage).streamId(l);
        if (ExceptionProcessor.isError(serviceMessage)) {
            atomicBoolean.set(true);
            streamId.signal(Signal.ERROR);
        }
        return streamId.build();
    }
}
