package org.noear.socketd.broker;

import java.io.IOException;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Listener;
import org.noear.socketd.transport.core.Message;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.transport.core.entity.MessageBuilder;
import org.noear.socketd.utils.RunUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/noear/socketd/broker/BrokerListener.class */
public class BrokerListener extends BrokerListenerBase implements Listener, BroadcastBroker {
    protected static final Logger log = LoggerFactory.getLogger(BrokerListener.class);

    @Override // org.noear.socketd.transport.core.Listener
    public void onOpen(Session session) throws IOException {
        addPlayer(session.name(), session);
    }

    @Override // org.noear.socketd.transport.core.Listener
    public void onClose(Session session) {
        removePlayer(session.name(), session);
    }

    @Override // org.noear.socketd.transport.core.Listener
    public void onMessage(Session session, Message message) throws IOException {
        String atName = message.atName();
        if (atName == null) {
            if (session == null) {
                throw new SocketException("Broker message require '@' meta");
            }
            session.sendAlarm(message, "Broker message require '@' meta");
            return;
        }
        if (atName.equals("*")) {
            Collection<String> nameAll = getNameAll();
            if (nameAll == null || nameAll.size() <= 0) {
                return;
            }
            Iterator it = new ArrayList(nameAll).iterator();
            while (it.hasNext()) {
                forwardToName(session, message, (String) it.next());
            }
            return;
        }
        if (atName.endsWith("*")) {
            String substring = atName.substring(0, atName.length() - 1);
            if (forwardToName(session, message, substring)) {
                return;
            }
            if (session == null) {
                throw new SocketException("Broker don't have '@" + substring + "' player");
            }
            session.sendAlarm(message, "Broker don't have '@" + substring + "' player");
            return;
        }
        Session playerAny = getPlayerAny(atName, session, message);
        if (playerAny != null) {
            forwardToSession(session, message, playerAny);
        } else {
            if (session == null) {
                throw new SocketException("Broker don't have '@" + atName + "' session");
            }
            session.sendAlarm(message, "Broker don't have '@" + atName + "' session");
        }
    }

    @Override // org.noear.socketd.broker.BroadcastBroker
    public void broadcast(String str, Entity entity) throws IOException {
        onMessage(null, new MessageBuilder().flag(40).event(str).entity(entity).build());
    }

    public boolean forwardToName(Session session, Message message, String str) throws IOException {
        Collection<Session> playerAll = getPlayerAll(str);
        if (playerAll == null || playerAll.size() <= 0) {
            return false;
        }
        Iterator it = new ArrayList(playerAll).iterator();
        while (it.hasNext()) {
            Session session2 = (Session) it.next();
            if (session2 != session) {
                if (session2.isValid()) {
                    forwardToSession(session, message, session2);
                } else {
                    onClose(session2);
                }
            }
        }
        return true;
    }

    public void forwardToSession(Session session, Message message, Session session2) throws IOException {
        if (message.isRequest()) {
            session2.sendAndRequest(message.event(), message, -1L).thenReply(reply -> {
                if (session == null || !session.isValid()) {
                    return;
                }
                session.reply(message, reply);
            }).thenError(th -> {
                if (session == null || !session.isValid()) {
                    return;
                }
                RunUtils.runAndTry(() -> {
                    session.sendAlarm(message, th.getMessage());
                });
            });
        } else if (message.isSubscribe()) {
            session2.sendAndSubscribe(message.event(), message).thenReply(reply2 -> {
                if (session == null || !session.isValid()) {
                    return;
                }
                if (reply2.isEnd()) {
                    session.replyEnd(message, reply2);
                } else {
                    session.reply(message, reply2);
                }
            }).thenError(th2 -> {
                if (session == null || !session.isValid()) {
                    return;
                }
                RunUtils.runAndTry(() -> {
                    session.sendAlarm(message, th2.getMessage());
                });
            });
        } else {
            session2.send(message.event(), message);
        }
    }

    @Override // org.noear.socketd.transport.core.Listener
    public void onError(Session session, Throwable th) {
        if (log.isWarnEnabled()) {
            log.warn("Broker error", th);
        }
    }
}
