package net.soundvibe.reacto.vertx.events;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.WebSocket;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import net.soundvibe.reacto.client.events.EventHandler;
import net.soundvibe.reacto.discovery.types.ServiceRecord;
import net.soundvibe.reacto.discovery.types.ServiceType;
import net.soundvibe.reacto.errors.CannotDiscoverService;
import net.soundvibe.reacto.errors.ConnectionClosedUnexpectedly;
import net.soundvibe.reacto.errors.ReactiveException;
import net.soundvibe.reacto.internal.EventType;
import net.soundvibe.reacto.internal.InternalEvent;
import net.soundvibe.reacto.mappers.Mappers;
import net.soundvibe.reacto.types.Command;
import net.soundvibe.reacto.types.Event;
import net.soundvibe.reacto.utils.WebUtils;
import net.soundvibe.reacto.vertx.server.Factories;
import net.soundvibe.reacto.vertx.server.handlers.WebSocketFrameHandler;
import rx.Observable;
import rx.exceptions.MissingBackpressureException;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;

/* loaded from: input_file:net/soundvibe/reacto/vertx/events/VertxWebSocketEventHandler.class */
public class VertxWebSocketEventHandler implements EventHandler, Closeable {
    private static final Logger log = LoggerFactory.getLogger(VertxWebSocketEventHandler.class);
    public static final int INITIAL_CAPACITY = 10000;
    private final ServiceRecord serviceRecord;
    private final HttpClient httpClient;
    private CompletableFuture<WebSocket> webSocketStream;
    private final Map<String, Subject<Event, Event>> streams = new ConcurrentHashMap(INITIAL_CAPACITY);
    private final AtomicBoolean closed = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: net.soundvibe.reacto.vertx.events.VertxWebSocketEventHandler$1, reason: invalid class name */
    /* loaded from: input_file:net/soundvibe/reacto/vertx/events/VertxWebSocketEventHandler$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$net$soundvibe$reacto$internal$EventType = new int[EventType.values().length];

        static {
            try {
                $SwitchMap$net$soundvibe$reacto$internal$EventType[EventType.NEXT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$net$soundvibe$reacto$internal$EventType[EventType.ERROR.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$net$soundvibe$reacto$internal$EventType[EventType.COMPLETED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public VertxWebSocketEventHandler(ServiceRecord serviceRecord) {
        Objects.requireNonNull(serviceRecord, "serviceRecord cannot be null");
        if (serviceRecord.type != ServiceType.WEBSOCKET) {
            throw new IllegalStateException("Unexpected service type: expected WEBSOCKET, but got: " + serviceRecord.type);
        }
        this.serviceRecord = serviceRecord;
        this.httpClient = Factories.vertx().createHttpClient(new HttpClientOptions().setUsePooledBuffers(true).setTryUseCompression(true).setReuseAddress(true).setSsl(((Boolean) serviceRecord.location.asBoolean("ssl").orElse(false)).booleanValue()).setKeepAlive(true).setTcpKeepAlive(true).setDefaultHost((String) serviceRecord.location.asString("host").orElse("localhost")).setDefaultPort(((Integer) serviceRecord.location.asInteger("port").orElse(80)).intValue()));
        createStream();
    }

    public Observable<Event> observe(Command command) {
        String objectId = command.id.toString();
        if (this.streams.size() > 10000) {
            return Observable.error(new MissingBackpressureException("WebSocket Event Handler exceeded command limit"));
        }
        Observable doOnUnsubscribe = this.streams.compute(objectId, (str, subject) -> {
            return subject != null ? subject : ReplaySubject.create();
        }).doOnUnsubscribe(() -> {
            this.streams.remove(objectId);
        });
        if (isClosed()) {
            createStream();
        }
        return Observable.from(this.webSocketStream, Schedulers.computation()).doOnNext(webSocket -> {
            sendCommandForExecution(command, webSocket);
        }).flatMap(webSocket2 -> {
            return doOnUnsubscribe;
        });
    }

    public ServiceRecord serviceRecord() {
        return this.serviceRecord;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        return Objects.equals(this.serviceRecord, ((VertxWebSocketEventHandler) obj).serviceRecord);
    }

    public int hashCode() {
        return Objects.hash(this.serviceRecord);
    }

    public String name() {
        return this.serviceRecord.name;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.webSocketStream.isCompletedExceptionally() || this.webSocketStream.isCancelled()) {
            return;
        }
        try {
            this.webSocketStream.get().close();
        } catch (InterruptedException | ExecutionException e) {
            log.error("Error when closing WebSocket: " + e);
        }
    }

    private boolean isClosed() {
        return this.closed.get();
    }

    private synchronized void createStream() {
        this.webSocketStream = CompletableFuture.supplyAsync(this::connect, Executors.newCachedThreadPool());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void sendCommandForExecution(Command command, WebSocket webSocket) {
        if (log.isDebugEnabled()) {
            log.debug("Sending command for execution: " + command);
        }
        webSocket.writeBinaryMessage(Buffer.buffer(Mappers.commandToBytes(command)));
    }

    private WebSocket connect() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        log.info("Connecting to WebSocket...");
        this.httpClient.websocketStream(WebUtils.includeStartDelimiter(WebUtils.includeEndDelimiter((String) this.serviceRecord.location.asString("root").orElse(this.serviceRecord.name)))).exceptionHandler(th -> {
            countDownLatch.countDown();
            doOnException(th);
        }).handler(webSocket -> {
            webSocket.exceptionHandler(this::doOnException).closeHandler(r3 -> {
                doOnClose();
            }).frameHandler(new WebSocketFrameHandler(buffer -> {
                handleEvent(buffer.getBytes());
            }));
            atomicReference.set(webSocket);
            countDownLatch.countDown();
            this.closed.set(false);
        });
        try {
            countDownLatch.await(5L, TimeUnit.SECONDS);
            WebSocket webSocket2 = (WebSocket) atomicReference.get();
            if (webSocket2 == null) {
                throw new CannotDiscoverService("Unable to connect to service's WebSocket: " + this.serviceRecord);
            }
            return webSocket2;
        } catch (InterruptedException e) {
            throw new CannotDiscoverService("Interrupted when trying to connect to service's WebSocket: " + this.serviceRecord);
        }
    }

    private void doOnException(Throwable th) {
        log.error("WebSocket error: " + th);
        this.closed.set(true);
        failWithError(th);
    }

    private void doOnClose() {
        log.warn("WebSocket is closed for: " + this.serviceRecord);
        this.closed.set(true);
        failWithError(new ConnectionClosedUnexpectedly("WebSocket connection closed without completion"));
    }

    private void handleEvent(byte[] bArr) {
        InternalEvent fromBytesToInternalEvent = Mappers.fromBytesToInternalEvent(bArr);
        String str = (String) fromBytesToInternalEvent.commandId().orElse("");
        if (log.isDebugEnabled()) {
            log.debug("InternalEvent [" + str + "] is being handled: " + fromBytesToInternalEvent.name + ": " + fromBytesToInternalEvent.eventType);
        }
        Subject<Event, Event> subject = this.streams.get(str);
        if (subject == null) {
            return;
        }
        switch (AnonymousClass1.$SwitchMap$net$soundvibe$reacto$internal$EventType[fromBytesToInternalEvent.eventType.ordinal()]) {
            case 1:
                subject.onNext(Mappers.fromInternalEvent(fromBytesToInternalEvent));
                return;
            case 2:
                this.streams.remove(str);
                subject.onError((Throwable) fromBytesToInternalEvent.error.orElse(ReactiveException.from(new UnknownError("Unknown error from internalEvent: " + fromBytesToInternalEvent))));
                return;
            case 3:
                this.streams.remove(str);
                subject.onCompleted();
                return;
            default:
                return;
        }
    }

    private void failWithError(Throwable th) {
        this.streams.forEach((str, subject) -> {
            subject.onError(th);
        });
        this.streams.clear();
    }
}
