package org.springframework.cloud.fn.consumer.websocket;

import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import jakarta.annotation.PostConstruct;
import java.util.LinkedHashMap;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.consumer.websocket.actuator.WebsocketConsumerTraceEndpoint;
import org.springframework.cloud.fn.consumer.websocket.trace.InMemoryTraceRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;

@EnableConfigurationProperties({WebsocketConsumerProperties.class})
@AutoConfiguration
/* loaded from: input_file:org/springframework/cloud/fn/consumer/websocket/WebsocketConsumerConfiguration.class */
public class WebsocketConsumerConfiguration {
    private static final Log logger = LogFactory.getLog(WebsocketConsumerConfiguration.class);

    @Value("${endpoints.websocketconsumertrace.enabled:false}")
    private boolean traceEndpointEnabled;

    @Autowired
    private WebsocketConsumerServer websocketConsumerServer;

    @Configuration(proxyBeanMethods = false)
    /* loaded from: input_file:org/springframework/cloud/fn/consumer/websocket/WebsocketConsumerConfiguration$WebsocketConsumerServerConfiguration.class */
    static class WebsocketConsumerServerConfiguration {
        WebsocketConsumerServerConfiguration() {
        }

        @Bean
        InMemoryTraceRepository websocketTraceRepository() {
            return new InMemoryTraceRepository();
        }

        @Bean
        WebsocketConsumerServer server(WebsocketConsumerProperties websocketConsumerProperties, WebsocketConsumerServerInitializer websocketConsumerServerInitializer) {
            return new WebsocketConsumerServer(websocketConsumerProperties, websocketConsumerServerInitializer);
        }

        @Bean
        WebsocketConsumerServerInitializer initializer(InMemoryTraceRepository inMemoryTraceRepository) {
            return new WebsocketConsumerServerInitializer(inMemoryTraceRepository);
        }
    }

    @PostConstruct
    public void init() throws InterruptedException {
        this.websocketConsumerServer.run();
    }

    @ConditionalOnProperty(value = {"endpoints.websocketsinktrace.enabled"}, havingValue = "true")
    @Bean
    public WebsocketConsumerTraceEndpoint websocketTraceEndpoint(InMemoryTraceRepository inMemoryTraceRepository) {
        return new WebsocketConsumerTraceEndpoint(inMemoryTraceRepository);
    }

    @Bean
    public Consumer<Message<?>> websocketConsumer(InMemoryTraceRepository inMemoryTraceRepository) {
        return message -> {
            if (logger.isTraceEnabled()) {
                logger.trace("Handling message: " + message);
            }
            SimpMessageHeaderAccessor.wrap(message).setMessageTypeIfNotSet(SimpMessageType.MESSAGE);
            String obj = message.getPayload().toString();
            for (Channel channel : WebsocketConsumerServer.CHANNELS) {
                if (logger.isTraceEnabled()) {
                    logger.trace(String.format("Writing message %s to channel %s", obj, channel.localAddress()));
                }
                channel.write(new TextWebSocketFrame(obj));
                channel.flush();
            }
            if (this.traceEndpointEnabled) {
                addMessageToTraceRepository(inMemoryTraceRepository, message);
            }
        };
    }

    private void addMessageToTraceRepository(InMemoryTraceRepository inMemoryTraceRepository, Message<?> message) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("type", "text");
        linkedHashMap.put("direction", "out");
        linkedHashMap.put("id", message.getHeaders().getId());
        linkedHashMap.put("payload", message.getPayload().toString());
        inMemoryTraceRepository.add(linkedHashMap);
    }
}
