package tech.guyi.component.message.stream.websocket;

import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Resource;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.guyi.component.message.stream.api.stream.MessageStream;
import tech.guyi.component.message.stream.api.stream.entry.Message;
import tech.guyi.component.message.stream.api.worker.MessageStreamWorker;
import tech.guyi.component.message.stream.websocket.connection.WebsocketConnection;
import tech.guyi.component.message.stream.websocket.exception.ConnectionNotReadyException;
import tech.guyi.component.message.stream.websocket.executor.WebSocketServerExecutors;
import tech.guyi.component.message.stream.websocket.topic.TopicHandlerFactory;

/* loaded from: input_file:tech/guyi/component/message/stream/websocket/WebSocketMessageStream.class */
public class WebSocketMessageStream implements MessageStream {
    private static final Logger log = LoggerFactory.getLogger(WebSocketMessageStream.class);

    @Resource
    private TopicHandlerFactory factory;

    @Resource
    private WebSocketConfiguration configuration;

    @Resource
    private WebSocketServerExecutors executors;

    @Resource
    private MessageStreamWorker worker;
    private boolean run;
    private ScheduledFuture<?> future;
    private WebsocketConnection connection;

    private WebsocketConnection createConnection(Consumer<Message> consumer) throws URISyntaxException {
        return new WebsocketConnection(new URI(this.executors.replace(this.configuration.getServer())), str -> {
            byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
            consumer.accept(new Message(this.factory.get().getTopic(bytes), bytes));
        }, serverHandshake -> {
            this.run = true;
            Optional.ofNullable(this.future).ifPresent(scheduledFuture -> {
                scheduledFuture.cancel(true);
            });
            log.info("Websocket连接建立");
        }, () -> {
            reconnect(consumer);
        }, exc -> {
            log.error("WebSocket连接异常", exc);
            reconnect(consumer);
        });
    }

    private void reconnect(Consumer<Message> consumer) {
        if (this.run) {
            this.run = false;
            this.future = this.worker.scheduleWithFixedDelay(() -> {
                connect(consumer);
            }, 0L, 10L, TimeUnit.SECONDS);
        }
    }

    private void connect(Consumer<Message> consumer) {
        if (this.connection != null && this.connection.isOpen()) {
            this.connection.close();
        }
        this.connection = createConnection(consumer);
        this.connection.connectBlocking();
    }

    @NonNull
    public String getName() {
        return "websocket";
    }

    public void close() {
        this.run = false;
        Optional.ofNullable(this.connection).ifPresent((v0) -> {
            v0.close();
        });
    }

    public void register(String str) {
    }

    public void unregister(String str) {
    }

    public void open(Consumer<Message> consumer) {
        connect(consumer);
    }

    public void publish(Message message) {
        if (!this.connection.isOpen()) {
            throw new ConnectionNotReadyException();
        }
        this.connection.send(this.factory.get().setTopic(message.getTopic(), message.getBytes()));
    }
}
