package io.scalecube.gateway.clientsdk.websocket;

import io.netty.buffer.ByteBuf;
import io.scalecube.gateway.clientsdk.ClientCodec;
import io.scalecube.gateway.clientsdk.ClientMessage;
import io.scalecube.gateway.clientsdk.ClientSettings;
import io.scalecube.gateway.clientsdk.ClientTransport;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.LoopResources;

/* loaded from: input_file:io/scalecube/gateway/clientsdk/websocket/WebsocketClientTransport.class */
public final class WebsocketClientTransport implements ClientTransport {
    private static final String CLIENT_RECV_TIME = "client-recv-time";
    private static final String CLIENT_SEND_TIME = "client-send-time";
    private static final String STREAM_ID = "sid";
    private static final String SIGNAL = "sig";
    private final ClientCodec<ByteBuf> codec;
    private final ClientSettings settings;
    private final HttpClient httpClient;
    private final AtomicLong sidCounter = new AtomicLong();
    private volatile Mono<?> websocketMono;
    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketClientTransport.class);
    private static final AtomicReferenceFieldUpdater<WebsocketClientTransport, Mono> websocketMonoUpdater = AtomicReferenceFieldUpdater.newUpdater(WebsocketClientTransport.class, Mono.class, "websocketMono");

    public WebsocketClientTransport(ClientSettings clientSettings, ClientCodec<ByteBuf> clientCodec, LoopResources loopResources) {
        this.codec = clientCodec;
        this.settings = clientSettings;
        this.httpClient = HttpClient.newConnection().tcpConfiguration(tcpClient -> {
            return tcpClient.runOn(loopResources).host(clientSettings.host()).port(clientSettings.port());
        });
    }

    @Override // io.scalecube.gateway.clientsdk.ClientTransport
    public Mono<ClientMessage> requestResponse(ClientMessage clientMessage) {
        return Mono.defer(() -> {
            long incrementAndGet = this.sidCounter.incrementAndGet();
            ByteBuf enrichRequest = enrichRequest(clientMessage, incrementAndGet);
            return getOrConnect().flatMap(websocketSession -> {
                return websocketSession.send(enrichRequest, incrementAndGet).then(Mono.create(monoSink -> {
                    Flux map = websocketSession.receive(incrementAndGet).map(this::enrichResponse);
                    monoSink.getClass();
                    Consumer consumer = (v1) -> {
                        r1.success(v1);
                    };
                    monoSink.getClass();
                    Consumer consumer2 = monoSink::error;
                    monoSink.getClass();
                    map.subscribe(consumer, consumer2, monoSink::success);
                })).doOnCancel(() -> {
                    handleCancel(incrementAndGet, websocketSession);
                });
            });
        });
    }

    @Override // io.scalecube.gateway.clientsdk.ClientTransport
    public Flux<ClientMessage> requestStream(ClientMessage clientMessage) {
        return Flux.defer(() -> {
            long incrementAndGet = this.sidCounter.incrementAndGet();
            ByteBuf enrichRequest = enrichRequest(clientMessage, incrementAndGet);
            return getOrConnect().flatMapMany(websocketSession -> {
                return websocketSession.send(enrichRequest, incrementAndGet).thenMany(Flux.create(fluxSink -> {
                    Flux map = websocketSession.receive(incrementAndGet).map(this::enrichResponse);
                    fluxSink.getClass();
                    Consumer consumer = (v1) -> {
                        r1.next(v1);
                    };
                    fluxSink.getClass();
                    Consumer consumer2 = fluxSink::error;
                    fluxSink.getClass();
                    map.subscribe(consumer, consumer2, fluxSink::complete);
                })).doOnCancel(() -> {
                    handleCancel(incrementAndGet, websocketSession);
                });
            });
        });
    }

    @Override // io.scalecube.gateway.clientsdk.ClientTransport
    public Mono<Void> close() {
        return Mono.defer(() -> {
            Mono mono = websocketMonoUpdater.get(this);
            return (mono == null ? Mono.empty() : mono.flatMap((v0) -> {
                return v0.close();
            })).doOnTerminate(() -> {
                LOGGER.info("Closed websocket client sdk transport");
            });
        });
    }

    private Mono<WebsocketSession> getOrConnect() {
        return Mono.defer(() -> {
            return websocketMonoUpdater.updateAndGet(this, this::getOrConnect0);
        });
    }

    private Mono<WebsocketSession> getOrConnect0(Mono<WebsocketSession> mono) {
        return mono != null ? mono : this.httpClient.websocket().uri("/").connect().map(connection -> {
            WebsocketSession websocketSession = new WebsocketSession(this.codec, connection);
            LOGGER.info("Created {} on {}:{}", new Object[]{websocketSession, this.settings.host(), Integer.valueOf(this.settings.port())});
            websocketSession.onClose().doOnTerminate(() -> {
                websocketMonoUpdater.getAndSet(this, null);
                LOGGER.info("Closed {} on {}:{}", new Object[]{websocketSession, this.settings.host(), Integer.valueOf(this.settings.port())});
            }).subscribe();
            return websocketSession;
        }).doOnError(th -> {
            LOGGER.warn("Failed to connect on {}:{}, cause: {}", new Object[]{this.settings.host(), Integer.valueOf(this.settings.port()), th});
            websocketMonoUpdater.getAndSet(this, null);
        }).cache();
    }

    private Disposable handleCancel(long j, WebsocketSession websocketSession) {
        return websocketSession.send(this.codec.encode(ClientMessage.builder().header(STREAM_ID, Long.valueOf(j)).header(SIGNAL, Signal.CANCEL.codeAsString()).build()), j).subscribe();
    }

    private ByteBuf enrichRequest(ClientMessage clientMessage, long j) {
        return this.codec.encode(ClientMessage.from(clientMessage).header(CLIENT_SEND_TIME, Long.valueOf(System.currentTimeMillis())).header(STREAM_ID, Long.valueOf(j)).build());
    }

    private ClientMessage enrichResponse(ClientMessage clientMessage) {
        return ClientMessage.from(clientMessage).header(CLIENT_RECV_TIME, Long.valueOf(System.currentTimeMillis())).build();
    }
}
