package io.bigio.core;

import io.bigio.Component;
import io.bigio.Interceptor;
import io.bigio.MessageListener;
import io.bigio.core.member.Member;
import io.bigio.core.member.MemberKey;
import io.bigio.util.TopicUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.spec.Reactors;
import reactor.event.Event;
import reactor.event.selector.Selectors;

@Component
/* loaded from: input_file:io/bigio/core/ListenerRegistry.class */
public class ListenerRegistry {
    private static final int THREAD_POOL_SIZE = 8;
    private static final Logger LOG = LoggerFactory.getLogger(ListenerRegistry.class);
    private Member me;
    private final Environment environment = new Environment();
    private final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool(THREAD_POOL_SIZE);
    private final Map<Member, Map<String, List<Registration>>> map = new ConcurrentHashMap();
    private final Map<String, List<Interceptor>> interceptors = new ConcurrentHashMap();
    private final Reactor reactor = (Reactor) Reactors.reactor().env(this.environment).dispatcher("ringBuffer").get();

    public void addInterceptor(String str, Interceptor interceptor) {
        if (this.interceptors.get(str) == null) {
            this.interceptors.put(str, new ArrayList());
        }
        this.interceptors.get(str).add(interceptor);
    }

    public void setMe(Member member) {
        this.me = member;
    }

    public Member getMe() {
        return this.me;
    }

    public <T> void addLocalListener(String str, String str2, MessageListener<T> messageListener) {
        this.reactor.on(Selectors.regex(TopicUtils.getTopicString(str, str2)), event -> {
            try {
                messageListener.receive(((Envelope) event.getData()).getMessage());
            } catch (ClassCastException e) {
                LOG.error("Topic '" + str + "' received incorrect message type : " + ((Envelope) event.getData()).getMessage().getClass().getName(), e);
            } catch (Exception e2) {
                LOG.error("Exception in Reactor.", e2);
            }
        });
    }

    public void removeAllLocalListeners(String str, String str2) {
        Map<String, List<Registration>> map = this.map.get(this.me);
        if (map != null) {
            List<Registration> list = map.get(str);
            if (list == null) {
                LOG.trace("No listeners registered for topic " + str);
                return;
            }
            LOG.trace("Removing " + list.size() + " registration");
            this.reactor.getConsumerRegistry().unregister(Selectors.regex(TopicUtils.getTopicString(str, str2)));
            list.clear();
        }
    }

    public void removeRegistrations(List<Registration> list) {
        this.map.values().stream().filter(map -> {
            return map != null;
        }).forEach(map2 -> {
            map2.keySet().stream().forEach(str -> {
                ((List) map2.get(str)).removeAll(list);
            });
        });
    }

    public List<Registration> getAllRegistrations() {
        ArrayList arrayList = new ArrayList();
        this.map.values().stream().filter(map -> {
            return map != null;
        }).forEach(map2 -> {
            map2.keySet().stream().forEach(str -> {
                arrayList.addAll((Collection) map2.get(str));
            });
        });
        return arrayList;
    }

    public List<Member> getRegisteredMembers(String str) {
        ArrayList arrayList = new ArrayList();
        this.map.keySet().stream().forEach(member -> {
            Map<String, List<Registration>> map = this.map.get(member);
            if (map != null) {
                map.keySet().stream().filter(str2 -> {
                    return str2.equals(str);
                }).forEach(str3 -> {
                    arrayList.add(member);
                });
            }
        });
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void registerMemberForTopic(String str, String str2, Member member) {
        if (this.map.get(member) == null) {
            this.map.put(member, new ConcurrentHashMap());
        }
        if (this.map.get(member).get(str) == null) {
            this.map.get(member).put(str, new ArrayList());
        }
        boolean z = false;
        Iterator<Registration> it = this.map.get(member).get(str).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Registration next = it.next();
            if (str.equals(next.getTopic()) && str2.equals(next.getPartition()) && member.equals(next.getMember())) {
                z = true;
                break;
            }
        }
        if (z) {
            return;
        }
        this.map.get(member).get(str).add(new Registration(member, str, str2));
        if (LOG.isTraceEnabled()) {
            LOG.trace(MemberKey.getKey(this.me) + ": Registering member '" + MemberKey.getKey(member) + "' for topic '" + str + "' on partition '" + str2 + "'");
        }
    }

    public void send(Envelope envelope) throws IOException {
        if (this.interceptors.containsKey(envelope.getTopic())) {
            Iterator<Interceptor> it = this.interceptors.get(envelope.getTopic()).iterator();
            while (it.hasNext()) {
                envelope = it.next().intercept(envelope);
            }
        }
        if (envelope.getExecuteTime() > 0) {
            Envelope envelope2 = envelope;
            this.futureExecutor.schedule(() -> {
                this.reactor.notify(TopicUtils.getNotifyTopicString(envelope2.getTopic(), envelope2.getPartition()), Event.wrap(envelope2));
            }, envelope.getExecuteTime(), TimeUnit.MILLISECONDS);
        } else if (envelope.getExecuteTime() >= 0) {
            this.reactor.notify(TopicUtils.getNotifyTopicString(envelope.getTopic(), envelope.getPartition()), Event.wrap(envelope));
        }
    }
}
