package io.micronaut.http.netty.websocket;

import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.MediaType;
import io.micronaut.websocket.WebSocketBroadcaster;
import io.micronaut.websocket.WebSocketSession;
import io.micronaut.websocket.exceptions.WebSocketSessionException;
import io.netty.channel.group.ChannelGroupException;
import jakarta.inject.Singleton;
import java.nio.channels.ClosedChannelException;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

@Singleton
@Requires(beans = {WebSocketSessionRepository.class})
/* loaded from: input_file:io/micronaut/http/netty/websocket/NettyServerWebSocketBroadcaster.class */
public class NettyServerWebSocketBroadcaster implements WebSocketBroadcaster {
    private final WebSocketMessageEncoder webSocketMessageEncoder;
    private final WebSocketSessionRepository webSocketSessionRepository;

    public NettyServerWebSocketBroadcaster(WebSocketMessageEncoder webSocketMessageEncoder, WebSocketSessionRepository webSocketSessionRepository) {
        this.webSocketMessageEncoder = webSocketMessageEncoder;
        this.webSocketSessionRepository = webSocketSessionRepository;
    }

    public <T> void broadcastSync(T t, MediaType mediaType, Predicate<WebSocketSession> predicate) {
        try {
            this.webSocketSessionRepository.getChannelGroup().writeAndFlush(this.webSocketMessageEncoder.encodeMessage(t, mediaType), channel -> {
                NettyWebSocketSession nettyWebSocketSession = (NettyWebSocketSession) channel.attr(NettyWebSocketSession.WEB_SOCKET_SESSION_KEY).get();
                return nettyWebSocketSession != null && nettyWebSocketSession.isOpen() && predicate.test(nettyWebSocketSession);
            }).sync();
        } catch (InterruptedException e) {
            throw new WebSocketSessionException("Broadcast Interrupted");
        }
    }

    public <T> Publisher<T> broadcast(T t, MediaType mediaType, Predicate<WebSocketSession> predicate) {
        return Flux.create(fluxSink -> {
            try {
                this.webSocketSessionRepository.getChannelGroup().writeAndFlush(this.webSocketMessageEncoder.encodeMessage(t, mediaType), channel -> {
                    NettyWebSocketSession nettyWebSocketSession = (NettyWebSocketSession) channel.attr(NettyWebSocketSession.WEB_SOCKET_SESSION_KEY).get();
                    return nettyWebSocketSession != null && nettyWebSocketSession.isOpen() && predicate.test(nettyWebSocketSession);
                }).addListener(future -> {
                    Throwable extractBroadcastFailure;
                    if (!future.isSuccess() && (extractBroadcastFailure = extractBroadcastFailure(future.cause())) != null) {
                        fluxSink.error(new WebSocketSessionException("Broadcast Failure: " + extractBroadcastFailure.getMessage(), extractBroadcastFailure));
                    } else {
                        fluxSink.next(t);
                        fluxSink.complete();
                    }
                });
            } catch (Throwable th) {
                fluxSink.error(new WebSocketSessionException("Broadcast Failure: " + th.getMessage(), th));
            }
        }, FluxSink.OverflowStrategy.BUFFER);
    }

    @Nullable
    private Throwable extractBroadcastFailure(Throwable th) {
        if (!(th instanceof ChannelGroupException)) {
            if (th instanceof ClosedChannelException) {
                return null;
            }
            return th;
        }
        Throwable th2 = null;
        Iterator it = ((ChannelGroupException) th).iterator();
        while (it.hasNext()) {
            Throwable extractBroadcastFailure = extractBroadcastFailure((Throwable) ((Map.Entry) it.next()).getValue());
            if (extractBroadcastFailure != null) {
                if (th2 != null) {
                    return th;
                }
                th2 = extractBroadcastFailure;
            }
        }
        return th2;
    }
}
