package net.soundvibe.reacto.vertx.events;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketStream;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.util.Objects;
import java.util.function.Function;
import net.soundvibe.reacto.client.events.EventHandler;
import net.soundvibe.reacto.discovery.types.ServiceRecord;
import net.soundvibe.reacto.discovery.types.ServiceType;
import net.soundvibe.reacto.errors.ConnectionClosedUnexpectedly;
import net.soundvibe.reacto.errors.ReactiveException;
import net.soundvibe.reacto.internal.EventType;
import net.soundvibe.reacto.internal.InternalEvent;
import net.soundvibe.reacto.mappers.Mappers;
import net.soundvibe.reacto.types.Command;
import net.soundvibe.reacto.types.Event;
import net.soundvibe.reacto.utils.WebUtils;
import net.soundvibe.reacto.vertx.server.Factories;
import net.soundvibe.reacto.vertx.server.handlers.WebSocketFrameHandler;
import rx.Observable;
import rx.Subscriber;

/* loaded from: input_file:net/soundvibe/reacto/vertx/events/VertxWebSocketEventHandler.class */
public class VertxWebSocketEventHandler implements EventHandler, Function<ServiceRecord, EventHandler> {
    private static final Logger log = LoggerFactory.getLogger(VertxWebSocketEventHandler.class);
    private final ServiceRecord serviceRecord;
    private final HttpClient httpClient;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: net.soundvibe.reacto.vertx.events.VertxWebSocketEventHandler$1, reason: invalid class name */
    /* loaded from: input_file:net/soundvibe/reacto/vertx/events/VertxWebSocketEventHandler$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$net$soundvibe$reacto$internal$EventType = new int[EventType.values().length];

        static {
            try {
                $SwitchMap$net$soundvibe$reacto$internal$EventType[EventType.NEXT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$net$soundvibe$reacto$internal$EventType[EventType.ERROR.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$net$soundvibe$reacto$internal$EventType[EventType.COMPLETED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public VertxWebSocketEventHandler(ServiceRecord serviceRecord) {
        Objects.requireNonNull(serviceRecord, "serviceRecord cannot be null");
        if (serviceRecord.type != ServiceType.WEBSOCKET) {
            throw new IllegalStateException("Unexpected service type: expected WEBSOCKET, but got: " + serviceRecord.type);
        }
        this.serviceRecord = serviceRecord;
        this.httpClient = Factories.vertx().createHttpClient(new HttpClientOptions().setSsl(((Boolean) serviceRecord.location.asBoolean("ssl").orElse(false)).booleanValue()).setKeepAlive(true).setTcpKeepAlive(true).setDefaultHost((String) serviceRecord.location.asString("host").orElse("localhost")).setDefaultPort(((Integer) serviceRecord.location.asInteger("port").orElse(80)).intValue()));
    }

    public Observable<Event> observe(Command command) {
        return Observable.just(this.serviceRecord).map(serviceRecord -> {
            return this.httpClient.websocketStream(WebUtils.includeStartDelimiter(WebUtils.includeEndDelimiter(serviceRecord.name)));
        }).concatMap(webSocketStream -> {
            return observe(webSocketStream, command).onBackpressureBuffer();
        });
    }

    public ServiceRecord serviceRecord() {
        return this.serviceRecord;
    }

    public static EventHandler create(ServiceRecord serviceRecord) {
        return new VertxWebSocketEventHandler(serviceRecord);
    }

    @Override // java.util.function.Function
    public EventHandler apply(ServiceRecord serviceRecord) {
        return create(serviceRecord);
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        return Objects.equals(this.serviceRecord, ((VertxWebSocketEventHandler) obj).serviceRecord);
    }

    public int hashCode() {
        return Objects.hash(this.serviceRecord);
    }

    private static void checkForEvents(WebSocket webSocket, Subscriber<? super Event> subscriber) {
        webSocket.frameHandler(new WebSocketFrameHandler(buffer -> {
            try {
                if (!subscriber.isUnsubscribed()) {
                    handleEvent(Mappers.fromBytesToInternalEvent(buffer.getBytes()), subscriber);
                }
            } catch (Throwable th) {
                subscriber.onError(th);
            }
        }));
    }

    private static void handleEvent(InternalEvent internalEvent, Subscriber<? super Event> subscriber) {
        log.debug("InternalEvent has been received and is being handled: " + internalEvent);
        switch (AnonymousClass1.$SwitchMap$net$soundvibe$reacto$internal$EventType[internalEvent.eventType.ordinal()]) {
            case 1:
                subscriber.onNext(Mappers.fromInternalEvent(internalEvent));
                return;
            case 2:
                if (subscriber.isUnsubscribed()) {
                    return;
                }
                subscriber.onError((Throwable) internalEvent.error.orElse(ReactiveException.from(new UnknownError("Unknown error from internalEvent: " + internalEvent))));
                return;
            case 3:
                if (subscriber.isUnsubscribed()) {
                    return;
                }
                subscriber.onCompleted();
                return;
            default:
                return;
        }
    }

    public static Observable<Event> observe(WebSocketStream webSocketStream, Command command) {
        return Observable.create(subscriber -> {
            try {
                subscriber.getClass();
                webSocketStream.exceptionHandler(subscriber::onError).handler(webSocket -> {
                    try {
                        WebSocket closeHandler = webSocket.closeHandler(r8 -> {
                            if (subscriber.isUnsubscribed()) {
                                return;
                            }
                            subscriber.onError(new ConnectionClosedUnexpectedly("WebSocket connection closed without completion for command: " + command));
                        });
                        subscriber.getClass();
                        closeHandler.exceptionHandler(subscriber::onError);
                        checkForEvents(webSocket, subscriber);
                        sendCommandToExecutor(command, webSocket);
                    } catch (Throwable th) {
                        subscriber.onError(th);
                    }
                });
            } catch (Throwable th) {
                subscriber.onError(th);
            }
        });
    }

    private static void sendCommandToExecutor(Command command, WebSocket webSocket) {
        log.debug("Sending command to executor: " + command);
        webSocket.writeBinaryMessage(Buffer.buffer(Mappers.commandToBytes(command)));
    }

    public String name() {
        return this.serviceRecord.name;
    }
}
