package io.scalecube.gateway.clientsdk.websocket;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.scalecube.gateway.clientsdk.ClientCodec;
import io.scalecube.gateway.clientsdk.ClientMessage;
import io.scalecube.gateway.clientsdk.ErrorData;
import io.scalecube.gateway.clientsdk.ReferenceCountUtil;
import io.scalecube.gateway.clientsdk.exceptions.ExceptionProcessor;
import io.scalecube.services.exceptions.ServiceException;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.jctools.maps.NonBlockingHashMapLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.netty.Connection;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/scalecube/gateway/clientsdk/websocket/WebsocketSession.class */
public final class WebsocketSession {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketSession.class);
    private static final String STREAM_ID = "sid";
    private static final String SIGNAL = "sig";
    private final ClientCodec<ByteBuf> codec;
    private final Connection connection;
    private final WebsocketInbound inbound;
    private final WebsocketOutbound outbound;
    private final Map<Long, UnicastProcessor<ClientMessage>> inboundProcessors = new NonBlockingHashMapLong(1024);
    private final String id = Integer.toHexString(System.identityHashCode(this));

    /* JADX INFO: Access modifiers changed from: package-private */
    public WebsocketSession(ClientCodec<ByteBuf> clientCodec, Connection connection) {
        this.codec = clientCodec;
        this.connection = connection;
        this.inbound = connection.inbound();
        this.outbound = connection.outbound().options((v0) -> {
            v0.flushOnEach();
        });
        this.inbound.aggregateFrames().receive().retain().subscribe(byteBuf -> {
            try {
                ClientMessage decode = clientCodec.decode(byteBuf);
                if (!decode.hasHeader(STREAM_ID)) {
                    LOGGER.error("Ignore response: {} with null sid, session={}", decode, this.id);
                    Optional.ofNullable(decode.data()).ifPresent(ReferenceCountUtil::safestRelease);
                    return;
                }
                long longValue = Long.valueOf(decode.header(STREAM_ID)).longValue();
                UnicastProcessor<ClientMessage> unicastProcessor = this.inboundProcessors.get(Long.valueOf(longValue));
                if (unicastProcessor == null) {
                    LOGGER.error("Can't find processor by sid={} for response: {}, session={}", new Object[]{Long.valueOf(longValue), decode, this.id});
                    Optional.ofNullable(decode.data()).ifPresent(ReferenceCountUtil::safestRelease);
                    return;
                }
                unicastProcessor.getClass();
                Consumer<ClientMessage> consumer = (v1) -> {
                    r2.onNext(v1);
                };
                unicastProcessor.getClass();
                Consumer<Throwable> consumer2 = unicastProcessor::onError;
                unicastProcessor.getClass();
                handleResponse(decode, consumer, consumer2, unicastProcessor::onComplete);
            } catch (Exception e) {
                LOGGER.error("Response decoder failed: " + e);
            }
        });
    }

    public Mono<Void> send(ByteBuf byteBuf, long j) {
        return this.outbound.sendObject(Mono.fromCallable(() -> {
            return new TextWebSocketFrame(byteBuf);
        })).then().doOnSuccess(r8 -> {
            this.inboundProcessors.computeIfAbsent(Long.valueOf(j), l -> {
                return UnicastProcessor.create();
            });
            LOGGER.debug("Put sid={}, session={}", Long.valueOf(j), this.id);
        });
    }

    public Flux<ClientMessage> receive(long j) {
        return Flux.defer(() -> {
            UnicastProcessor<ClientMessage> unicastProcessor = this.inboundProcessors.get(Long.valueOf(j));
            if (unicastProcessor != null) {
                return unicastProcessor.doOnTerminate(() -> {
                    this.inboundProcessors.remove(Long.valueOf(j));
                    LOGGER.debug("Removed sid={}, session={}", Long.valueOf(j), this.id);
                });
            }
            LOGGER.error("Can't find processor by sid={}, session={}", Long.valueOf(j), this.id);
            throw new IllegalStateException("Can't find processor by sid");
        });
    }

    public Mono<Void> close() {
        return this.outbound.sendClose().then();
    }

    public Mono<Void> onClose() {
        return this.connection.onDispose();
    }

    private void handleResponse(ClientMessage clientMessage, Consumer<ClientMessage> consumer, Consumer<Throwable> consumer2, Runnable runnable) {
        LOGGER.debug("Handle response: {}, session={}", clientMessage, this.id);
        try {
            Optional map = Optional.ofNullable(clientMessage.header(SIGNAL)).map(Signal::from);
            if (map.isPresent()) {
                Signal signal = (Signal) map.get();
                if (signal == Signal.COMPLETE) {
                    runnable.run();
                }
                if (signal == Signal.ERROR) {
                    ErrorData errorData = (ErrorData) this.codec.decodeData(clientMessage, ErrorData.class).data();
                    ServiceException exception = ExceptionProcessor.toException(clientMessage.qualifier(), errorData.getErrorCode(), errorData.getErrorMessage());
                    LOGGER.error("Received error response: sid={}, error={}", clientMessage.header(STREAM_ID), exception);
                    consumer2.accept(exception);
                }
            } else {
                consumer.accept(clientMessage);
            }
        } catch (Exception e) {
            consumer2.accept(e);
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("client-sdk.WebsocketSession{");
        sb.append("id='").append(this.id).append('\'');
        sb.append('}');
        return sb.toString();
    }
}
