package com.github.linyuzai.connection.loadbalance.core.concept;

import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import com.github.linyuzai.connection.loadbalance.core.event.ConnectionCloseEvent;
import com.github.linyuzai.connection.loadbalance.core.event.ConnectionErrorEvent;
import com.github.linyuzai.connection.loadbalance.core.event.ConnectionEstablishEvent;
import com.github.linyuzai.connection.loadbalance.core.event.ConnectionEventListener;
import com.github.linyuzai.connection.loadbalance.core.event.ConnectionEventPublisher;
import com.github.linyuzai.connection.loadbalance.core.event.ConnectionLoadBalanceConceptDestroyEvent;
import com.github.linyuzai.connection.loadbalance.core.event.ConnectionLoadBalanceConceptInitializeEvent;
import com.github.linyuzai.connection.loadbalance.core.event.DefaultConnectionEventPublisher;
import com.github.linyuzai.connection.loadbalance.core.event.UnknownCloseEvent;
import com.github.linyuzai.connection.loadbalance.core.event.UnknownErrorEvent;
import com.github.linyuzai.connection.loadbalance.core.exception.ConnectionLoadBalanceException;
import com.github.linyuzai.connection.loadbalance.core.message.DeadMessageEvent;
import com.github.linyuzai.connection.loadbalance.core.message.Message;
import com.github.linyuzai.connection.loadbalance.core.message.MessageCodecAdapter;
import com.github.linyuzai.connection.loadbalance.core.message.MessageFactory;
import com.github.linyuzai.connection.loadbalance.core.message.MessageForwardHandler;
import com.github.linyuzai.connection.loadbalance.core.message.MessagePrepareEvent;
import com.github.linyuzai.connection.loadbalance.core.message.MessageReceiveEvent;
import com.github.linyuzai.connection.loadbalance.core.message.MessageSendErrorEvent;
import com.github.linyuzai.connection.loadbalance.core.message.MessageSendEvent;
import com.github.linyuzai.connection.loadbalance.core.message.ObjectMessageFactory;
import com.github.linyuzai.connection.loadbalance.core.message.UnknownMessageEvent;
import com.github.linyuzai.connection.loadbalance.core.message.decode.MessageDecodeErrorEvent;
import com.github.linyuzai.connection.loadbalance.core.repository.ConnectionRepository;
import com.github.linyuzai.connection.loadbalance.core.repository.DefaultConnectionRepository;
import com.github.linyuzai.connection.loadbalance.core.select.AllSelector;
import com.github.linyuzai.connection.loadbalance.core.select.ConnectionSelector;
import com.github.linyuzai.connection.loadbalance.core.server.ConnectionServer;
import com.github.linyuzai.connection.loadbalance.core.server.ConnectionServerManager;
import com.github.linyuzai.connection.loadbalance.core.subscribe.ConnectionSubscribeErrorEvent;
import com.github.linyuzai.connection.loadbalance.core.subscribe.ConnectionSubscribeHandler;
import com.github.linyuzai.connection.loadbalance.core.subscribe.ConnectionSubscriber;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import lombok.NonNull;

/* loaded from: input_file:com/github/linyuzai/connection/loadbalance/core/concept/AbstractConnectionLoadBalanceConcept.class */
public abstract class AbstractConnectionLoadBalanceConcept implements ConnectionLoadBalanceConcept {
    protected final ConnectionRepository connectionRepository;
    protected final ConnectionServerManager connectionServerManager;
    protected final ConnectionSubscriber connectionSubscriber;
    protected final List<ConnectionFactory> connectionFactories;
    protected final List<ConnectionSelector> connectionSelectors;
    protected final List<MessageFactory> messageFactories;
    protected final MessageCodecAdapter messageCodecAdapter;
    protected final ConnectionEventPublisher eventPublisher;

    /* loaded from: input_file:com/github/linyuzai/connection/loadbalance/core/concept/AbstractConnectionLoadBalanceConcept$AbstractBuilder.class */
    public static class AbstractBuilder<T extends AbstractBuilder<T>> {
        protected ConnectionRepository connectionRepository;
        protected ConnectionServerManager connectionServerManager;
        protected ConnectionSubscriber connectionSubscriber;
        protected MessageCodecAdapter messageCodecAdapter;
        protected ConnectionEventPublisher eventPublisher;
        protected List<ConnectionFactory> connectionFactories = new ArrayList();
        protected List<ConnectionSelector> connectionSelectors = new ArrayList();
        protected List<MessageFactory> messageFactories = new ArrayList();
        protected List<ConnectionEventListener> eventListeners = new ArrayList();

        public T connectionRepository(ConnectionRepository connectionRepository) {
            this.connectionRepository = connectionRepository;
            return this;
        }

        public T connectionServerManager(ConnectionServerManager connectionServerManager) {
            this.connectionServerManager = connectionServerManager;
            return this;
        }

        public T connectionSubscriber(ConnectionSubscriber connectionSubscriber) {
            this.connectionSubscriber = connectionSubscriber;
            return this;
        }

        public T addConnectionFactory(ConnectionFactory connectionFactory) {
            return addConnectionFactories(connectionFactory);
        }

        public T addConnectionFactories(ConnectionFactory... connectionFactoryArr) {
            return addConnectionFactories(Arrays.asList(connectionFactoryArr));
        }

        public T addConnectionFactories(Collection<? extends ConnectionFactory> collection) {
            this.connectionFactories.addAll(collection);
            return this;
        }

        public T addConnectionSelector(ConnectionSelector connectionSelector) {
            return addConnectionSelectors(connectionSelector);
        }

        public T addConnectionSelectors(ConnectionSelector... connectionSelectorArr) {
            return addConnectionSelectors(Arrays.asList(connectionSelectorArr));
        }

        public T addConnectionSelectors(Collection<? extends ConnectionSelector> collection) {
            this.connectionSelectors.addAll(collection);
            return this;
        }

        public T messageCodecAdapter(MessageCodecAdapter messageCodecAdapter) {
            this.messageCodecAdapter = messageCodecAdapter;
            return this;
        }

        public T addMessageFactory(MessageFactory messageFactory) {
            return addMessageFactories(messageFactory);
        }

        public T addMessageFactories(MessageFactory... messageFactoryArr) {
            return addMessageFactories(Arrays.asList(messageFactoryArr));
        }

        public T addMessageFactories(Collection<? extends MessageFactory> collection) {
            this.messageFactories.addAll(collection);
            return this;
        }

        public T eventPublisher(ConnectionEventPublisher connectionEventPublisher) {
            this.eventPublisher = connectionEventPublisher;
            return this;
        }

        public T addEventListener(ConnectionEventListener connectionEventListener) {
            return addEventListeners(connectionEventListener);
        }

        public T addEventListeners(ConnectionEventListener... connectionEventListenerArr) {
            return addEventListeners(Arrays.asList(connectionEventListenerArr));
        }

        public T addEventListeners(Collection<ConnectionEventListener> collection) {
            this.eventListeners.addAll(collection);
            return this;
        }

        protected void preBuild() {
            if (this.connectionServerManager == null) {
                throw new ConnectionLoadBalanceException("ConnectionServerProvider is null");
            }
            if (this.connectionSubscriber == null) {
                throw new ConnectionLoadBalanceException("ConnectionSubscriber is null");
            }
            if (this.messageCodecAdapter == null) {
                throw new ConnectionLoadBalanceException("MessageCodecAdapter is null");
            }
            if (this.connectionRepository == null) {
                this.connectionRepository = new DefaultConnectionRepository();
            }
            this.connectionSelectors.add(new AllSelector());
            this.messageFactories.add(new ObjectMessageFactory());
            if (this.eventPublisher == null) {
                this.eventPublisher = new DefaultConnectionEventPublisher();
            }
            this.eventListeners.add(0, new ConnectionSubscribeHandler());
            this.eventListeners.add(0, new MessageForwardHandler());
            this.eventPublisher.register(this.eventListeners);
        }
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void initialize() {
        subscribe(true);
        publish(new ConnectionLoadBalanceConceptInitializeEvent(this));
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void destroy() {
        publish(new ConnectionLoadBalanceConceptDestroyEvent(this));
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void subscribe(boolean z) {
        Iterator<ConnectionServer> it = this.connectionServerManager.getConnectionServers().iterator();
        while (it.hasNext()) {
            subscribe(it.next(), z);
        }
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public synchronized void subscribe(ConnectionServer connectionServer, boolean z) {
        Connection subscriberConnection = getSubscriberConnection(connectionServer);
        if (subscriberConnection != null) {
            if (subscriberConnection.isAlive()) {
                return;
            } else {
                subscriberConnection.close("NotAlive");
            }
        }
        try {
            this.connectionSubscriber.subscribe(connectionServer, this, connection -> {
                onEstablish(connection);
                if (z) {
                    connection.send(createMessage(this.connectionServerManager.getLocal()));
                }
            });
        } catch (Throwable th) {
            publish(new ConnectionSubscribeErrorEvent(connectionServer, th));
        }
    }

    public Connection getSubscriberConnection(ConnectionServer connectionServer) {
        if (connectionServer == null) {
            return null;
        }
        for (Connection connection : this.connectionRepository.select(Connection.Type.SUBSCRIBER)) {
            if (this.connectionServerManager.isEqual(connectionServer, (ConnectionServer) connection.getMetadata().get(ConnectionServer.class))) {
                return connection;
            }
        }
        return null;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public Connection create(Object obj, Map<Object, Object> map) {
        ConnectionFactory connectionFactory = getConnectionFactory(obj, map);
        if (connectionFactory == null) {
            throw new ConnectionLoadBalanceException("No ConnectionFactory available with " + obj);
        }
        Connection create = connectionFactory.create(obj, map, this);
        if (create == null) {
            throw new ConnectionLoadBalanceException("Connection can not be created with " + obj);
        }
        return create;
    }

    public ConnectionFactory getConnectionFactory(Object obj, Map<Object, Object> map) {
        for (ConnectionFactory connectionFactory : this.connectionFactories) {
            if (connectionFactory.support(obj, map)) {
                return connectionFactory;
            }
        }
        return null;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public Connection onEstablish(Object obj, Map<Object, Object> map) {
        Connection create = create(obj, map);
        onEstablish(create);
        return create;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void onEstablish(Connection connection) {
        String type = connection.getType();
        connection.setConcept(this);
        connection.setMessageEncoder(this.messageCodecAdapter.getMessageEncoder(type));
        connection.setMessageDecoder(this.messageCodecAdapter.getMessageDecoder(type));
        this.connectionRepository.add(connection);
        publish(new ConnectionEstablishEvent(connection));
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void onClose(Object obj, String str, Object obj2) {
        Connection connection = this.connectionRepository.get(obj, str);
        if (connection == null) {
            publish(new UnknownCloseEvent(obj, str, obj2, this));
        } else {
            onClose(connection, obj2);
        }
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void onClose(@NonNull Connection connection, Object obj) {
        if (connection == null) {
            throw new NullPointerException("connection is marked non-null but is null");
        }
        Connection remove = this.connectionRepository.remove(connection);
        if (remove != null) {
            publish(new ConnectionCloseEvent(remove, obj));
        }
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void onMessage(Object obj, String str, Object obj2) {
        Connection connection = this.connectionRepository.get(obj, str);
        if (connection == null) {
            publish(new UnknownMessageEvent(obj, str, obj2, this));
        } else {
            onMessage(connection, obj2);
        }
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void onMessage(@NonNull Connection connection, Object obj) {
        if (connection == null) {
            throw new NullPointerException("connection is marked non-null but is null");
        }
        try {
            publish(new MessageReceiveEvent(connection, connection.getMessageDecoder().decode(obj)));
        } catch (Throwable th) {
            publish(new MessageDecodeErrorEvent(connection, th));
        }
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void onError(Object obj, String str, Throwable th) {
        Connection connection = this.connectionRepository.get(obj, str);
        if (connection == null) {
            publish(new UnknownErrorEvent(obj, str, th, this));
        } else {
            onError(connection, th);
        }
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void onError(@NonNull Connection connection, Throwable th) {
        if (connection == null) {
            throw new NullPointerException("connection is marked non-null but is null");
        }
        publish(new ConnectionErrorEvent(connection, th));
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void send(Object obj) {
        Message createMessage = createMessage(obj);
        Collection<Connection> select = getConnectionSelector(createMessage).select(createMessage, this.connectionRepository, this);
        if (select == null || select.isEmpty()) {
            publish(new DeadMessageEvent(createMessage));
            return;
        }
        publish(new MessagePrepareEvent(createMessage, select));
        for (Connection connection : select) {
            try {
                connection.send(createMessage);
            } catch (Throwable th) {
                publish(new MessageSendErrorEvent(connection, createMessage, th));
            }
        }
        publish(new MessageSendEvent(createMessage, select));
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void send(Object obj, Map<String, String> map) {
        if (map == null) {
            send(obj);
            return;
        }
        Message createMessage = createMessage(obj);
        createMessage.getHeaders().putAll(map);
        send(createMessage);
    }

    public Message createMessage(Object obj) {
        if (obj instanceof Message) {
            return (Message) obj;
        }
        MessageFactory messageFactory = getMessageFactory(obj);
        if (messageFactory == null) {
            throw new ConnectionLoadBalanceException("No MessageFactory available with " + obj);
        }
        Message create = messageFactory.create(obj);
        if (create == null) {
            throw new ConnectionLoadBalanceException("Message can not be created with " + obj);
        }
        return create;
    }

    public MessageFactory getMessageFactory(Object obj) {
        for (MessageFactory messageFactory : this.messageFactories) {
            if (messageFactory.support(obj)) {
                return messageFactory;
            }
        }
        return null;
    }

    public ConnectionSelector getConnectionSelector(Message message) {
        for (ConnectionSelector connectionSelector : this.connectionSelectors) {
            if (connectionSelector.support(message)) {
                return connectionSelector;
            }
        }
        return null;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void publish(Object obj) {
        this.eventPublisher.publish(obj);
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public Connection getConnection(Object obj, String str) {
        return this.connectionRepository.get(obj, str);
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public Collection<Connection> getConnections(String str) {
        return this.connectionRepository.select(str);
    }

    public ConnectionRepository getConnectionRepository() {
        return this.connectionRepository;
    }

    public ConnectionServerManager getConnectionServerManager() {
        return this.connectionServerManager;
    }

    public ConnectionSubscriber getConnectionSubscriber() {
        return this.connectionSubscriber;
    }

    public List<ConnectionFactory> getConnectionFactories() {
        return this.connectionFactories;
    }

    public List<ConnectionSelector> getConnectionSelectors() {
        return this.connectionSelectors;
    }

    public List<MessageFactory> getMessageFactories() {
        return this.messageFactories;
    }

    public MessageCodecAdapter getMessageCodecAdapter() {
        return this.messageCodecAdapter;
    }

    public ConnectionEventPublisher getEventPublisher() {
        return this.eventPublisher;
    }

    public AbstractConnectionLoadBalanceConcept(ConnectionRepository connectionRepository, ConnectionServerManager connectionServerManager, ConnectionSubscriber connectionSubscriber, List<ConnectionFactory> list, List<ConnectionSelector> list2, List<MessageFactory> list3, MessageCodecAdapter messageCodecAdapter, ConnectionEventPublisher connectionEventPublisher) {
        this.connectionRepository = connectionRepository;
        this.connectionServerManager = connectionServerManager;
        this.connectionSubscriber = connectionSubscriber;
        this.connectionFactories = list;
        this.connectionSelectors = list2;
        this.messageFactories = list3;
        this.messageCodecAdapter = messageCodecAdapter;
        this.eventPublisher = connectionEventPublisher;
    }
}
