/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.fn.consumer.websocket;

import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import jakarta.annotation.PostConstruct;
import java.util.LinkedHashMap;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.consumer.websocket.WebsocketConsumerProperties;
import org.springframework.cloud.fn.consumer.websocket.WebsocketConsumerServer;
import org.springframework.cloud.fn.consumer.websocket.WebsocketConsumerServerInitializer;
import org.springframework.cloud.fn.consumer.websocket.actuator.WebsocketConsumerTraceEndpoint;
import org.springframework.cloud.fn.consumer.websocket.trace.InMemoryTraceRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;

@AutoConfiguration
@EnableConfigurationProperties(value={WebsocketConsumerProperties.class})
public class WebsocketConsumerConfiguration {
    private static final Log logger = LogFactory.getLog(WebsocketConsumerConfiguration.class);
    @Value(value="${endpoints.websocketconsumertrace.enabled:false}")
    private boolean traceEndpointEnabled;
    @Autowired
    private WebsocketConsumerServer websocketConsumerServer;

    @PostConstruct
    public void init() throws InterruptedException {
        this.websocketConsumerServer.run();
    }

    @Bean
    @ConditionalOnProperty(value={"endpoints.websocketsinktrace.enabled"}, havingValue="true")
    public WebsocketConsumerTraceEndpoint websocketTraceEndpoint(InMemoryTraceRepository websocketTraceRepository) {
        return new WebsocketConsumerTraceEndpoint(websocketTraceRepository);
    }

    @Bean
    public Consumer<Message<?>> websocketConsumer(InMemoryTraceRepository websocketTraceRepository) {
        return message -> {
            if (logger.isTraceEnabled()) {
                logger.trace((Object)("Handling message: " + message));
            }
            SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap((Message)message);
            headers.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);
            String messagePayload = message.getPayload().toString();
            for (Channel channel : WebsocketConsumerServer.CHANNELS) {
                if (logger.isTraceEnabled()) {
                    logger.trace((Object)String.format("Writing message %s to channel %s", messagePayload, channel.localAddress()));
                }
                channel.write((Object)new TextWebSocketFrame(messagePayload));
                channel.flush();
            }
            if (this.traceEndpointEnabled) {
                this.addMessageToTraceRepository(websocketTraceRepository, (Message<?>)message);
            }
        };
    }

    private void addMessageToTraceRepository(InMemoryTraceRepository websocketTraceRepository, Message<?> message) {
        LinkedHashMap<String, Object> trace = new LinkedHashMap<String, Object>();
        trace.put("type", "text");
        trace.put("direction", "out");
        trace.put("id", message.getHeaders().getId());
        trace.put("payload", message.getPayload().toString());
        websocketTraceRepository.add(trace);
    }

    @Configuration(proxyBeanMethods=false)
    static class WebsocketConsumerServerConfiguration {
        WebsocketConsumerServerConfiguration() {
        }

        @Bean
        InMemoryTraceRepository websocketTraceRepository() {
            return new InMemoryTraceRepository();
        }

        @Bean
        WebsocketConsumerServer server(WebsocketConsumerProperties properties, WebsocketConsumerServerInitializer initializer) {
            return new WebsocketConsumerServer(properties, initializer);
        }

        @Bean
        WebsocketConsumerServerInitializer initializer(InMemoryTraceRepository websocketTraceRepository) {
            return new WebsocketConsumerServerInitializer(websocketTraceRepository);
        }
    }
}

