package io.bigio.core.member;

import io.bigio.core.Envelope;
import io.bigio.core.GossipListener;
import io.bigio.core.GossipMessage;
import io.bigio.core.ListenerRegistry;
import io.bigio.core.codec.EnvelopeDecoder;
import java.io.IOException;
import org.msgpack.MessageTypeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.spec.Reactors;
import reactor.event.Event;
import reactor.event.selector.Selectors;
import reactor.function.Consumer;

/* loaded from: input_file:io/bigio/core/member/MeMember.class */
public abstract class MeMember extends AbstractMember {
    private static final Logger LOG = LoggerFactory.getLogger(MeMember.class);
    protected static final String GOSSIP_TOPIC = "__gossiper";
    protected static final String DECODE_TOPIC = "__decoder";
    private final Environment env;
    protected Reactor reactor;
    protected Reactor decoderReactor;
    protected ListenerRegistry registry;

    public MeMember(MemberHolder memberHolder, ListenerRegistry listenerRegistry) {
        super(memberHolder);
        this.env = new Environment();
        this.registry = listenerRegistry;
    }

    public MeMember(String str, int i, int i2, MemberHolder memberHolder, ListenerRegistry listenerRegistry) {
        super(str, i, i2, memberHolder);
        this.env = new Environment();
        this.registry = listenerRegistry;
    }

    protected abstract void initializeServers();

    public void addGossipConsumer(final GossipListener gossipListener) {
        this.reactor.on(Selectors.$(GOSSIP_TOPIC), new Consumer<Event<GossipMessage>>() { // from class: io.bigio.core.member.MeMember.1
            public void accept(Event<GossipMessage> event) {
                gossipListener.accept((GossipMessage) event.getData());
            }
        });
    }

    @Override // io.bigio.core.member.AbstractMember
    public void initialize() {
        initializeReactor();
        initializeServers();
    }

    @Override // io.bigio.core.member.Member
    public void send(Envelope envelope) throws IOException {
        this.registry.send(envelope);
    }

    private void initializeReactor() {
        this.reactor = (Reactor) Reactors.reactor().env(this.env).dispatcher("ringBuffer").get();
        this.decoderReactor = (Reactor) Reactors.reactor().env(this.env).dispatcher("ringBuffer").get();
        this.decoderReactor.on(Selectors.$(DECODE_TOPIC), new Consumer<Event<byte[]>>() { // from class: io.bigio.core.member.MeMember.2
            public void accept(Event<byte[]> event) {
                try {
                    Envelope decode = EnvelopeDecoder.decode((byte[]) event.getData());
                    decode.setDecoded(false);
                    MeMember.this.send(decode);
                } catch (IOException | MessageTypeException e) {
                    MeMember.LOG.error("Error decoding message.", e);
                }
            }
        });
    }
}
