package com.github.linyuzai.connection.loadbalance.core.concept;

import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import com.github.linyuzai.connection.loadbalance.core.concept.ConnectionFactory;
import com.github.linyuzai.connection.loadbalance.core.event.ConnectionCloseEvent;
import com.github.linyuzai.connection.loadbalance.core.event.ConnectionErrorEvent;
import com.github.linyuzai.connection.loadbalance.core.event.ConnectionEstablishEvent;
import com.github.linyuzai.connection.loadbalance.core.event.ConnectionEventListener;
import com.github.linyuzai.connection.loadbalance.core.event.ConnectionEventPublisher;
import com.github.linyuzai.connection.loadbalance.core.event.ConnectionEventPublisherFactory;
import com.github.linyuzai.connection.loadbalance.core.event.ConnectionLoadBalanceConceptDestroyEvent;
import com.github.linyuzai.connection.loadbalance.core.event.ConnectionLoadBalanceConceptInitializeEvent;
import com.github.linyuzai.connection.loadbalance.core.event.UnknownCloseEvent;
import com.github.linyuzai.connection.loadbalance.core.event.UnknownErrorEvent;
import com.github.linyuzai.connection.loadbalance.core.exception.ConnectionLoadBalanceException;
import com.github.linyuzai.connection.loadbalance.core.executor.ScheduledExecutor;
import com.github.linyuzai.connection.loadbalance.core.executor.ScheduledExecutorFactory;
import com.github.linyuzai.connection.loadbalance.core.logger.ConnectionLogger;
import com.github.linyuzai.connection.loadbalance.core.logger.ConnectionLoggerFactory;
import com.github.linyuzai.connection.loadbalance.core.message.BaseMessageCodecAdapter;
import com.github.linyuzai.connection.loadbalance.core.message.DeadMessageEvent;
import com.github.linyuzai.connection.loadbalance.core.message.Message;
import com.github.linyuzai.connection.loadbalance.core.message.MessageCodecAdapter;
import com.github.linyuzai.connection.loadbalance.core.message.MessageCodecAdapterChain;
import com.github.linyuzai.connection.loadbalance.core.message.MessageDiscardEvent;
import com.github.linyuzai.connection.loadbalance.core.message.MessageFactory;
import com.github.linyuzai.connection.loadbalance.core.message.MessageForwardHandler;
import com.github.linyuzai.connection.loadbalance.core.message.MessagePrepareEvent;
import com.github.linyuzai.connection.loadbalance.core.message.MessageReceiveEvent;
import com.github.linyuzai.connection.loadbalance.core.message.MessageReceivePredicateErrorEvent;
import com.github.linyuzai.connection.loadbalance.core.message.MessageSendErrorEvent;
import com.github.linyuzai.connection.loadbalance.core.message.MessageSendEvent;
import com.github.linyuzai.connection.loadbalance.core.message.ObjectMessageFactory;
import com.github.linyuzai.connection.loadbalance.core.message.UnknownMessageEvent;
import com.github.linyuzai.connection.loadbalance.core.message.decode.MessageDecodeErrorEvent;
import com.github.linyuzai.connection.loadbalance.core.message.decode.MessageDecoder;
import com.github.linyuzai.connection.loadbalance.core.message.encode.MessageEncoder;
import com.github.linyuzai.connection.loadbalance.core.message.idempotent.MessageIdempotentVerifier;
import com.github.linyuzai.connection.loadbalance.core.message.idempotent.MessageIdempotentVerifierFactory;
import com.github.linyuzai.connection.loadbalance.core.message.retry.MessageRetryStrategy;
import com.github.linyuzai.connection.loadbalance.core.message.retry.MessageRetryStrategyAdapter;
import com.github.linyuzai.connection.loadbalance.core.repository.ConnectionRepository;
import com.github.linyuzai.connection.loadbalance.core.repository.ConnectionRepositoryFactory;
import com.github.linyuzai.connection.loadbalance.core.scope.Scoped;
import com.github.linyuzai.connection.loadbalance.core.scope.ScopedFactory;
import com.github.linyuzai.connection.loadbalance.core.select.AllSelector;
import com.github.linyuzai.connection.loadbalance.core.select.ConnectionSelector;
import com.github.linyuzai.connection.loadbalance.core.select.filter.FilterConnectionSelector;
import com.github.linyuzai.connection.loadbalance.core.select.filter.FilterConnectionSelectorChain;
import com.github.linyuzai.connection.loadbalance.core.server.ConnectionServer;
import com.github.linyuzai.connection.loadbalance.core.server.ConnectionServerManager;
import com.github.linyuzai.connection.loadbalance.core.server.ConnectionServerManagerFactory;
import com.github.linyuzai.connection.loadbalance.core.subscribe.ConnectionSubscriber;
import com.github.linyuzai.connection.loadbalance.core.subscribe.ConnectionSubscriberFactory;
import com.github.linyuzai.connection.loadbalance.core.subscribe.masterslave.MasterFixedConnectionSubscriber;
import com.github.linyuzai.connection.loadbalance.core.subscribe.masterslave.MasterSlaveAutoSwitcher;
import com.github.linyuzai.connection.loadbalance.core.subscribe.masterslave.MasterSlaveConnectionSubscriberFactory;
import com.github.linyuzai.connection.loadbalance.core.subscribe.masterslave.MasterSlaveSwitchableConnectionSubscriber;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import lombok.NonNull;

/* loaded from: input_file:com/github/linyuzai/connection/loadbalance/core/concept/AbstractConnectionLoadBalanceConcept.class */
public abstract class AbstractConnectionLoadBalanceConcept implements ConnectionLoadBalanceConcept {
    protected final Map<String, MessageEncoder> messageEncoderMap = new ConcurrentHashMap();
    protected final Map<String, MessageDecoder> messageDecoderMap = new ConcurrentHashMap();
    protected final Map<String, MessageRetryStrategy> messageRetryStrategyMap = new ConcurrentHashMap();
    protected ConnectionRepository connectionRepository;
    protected ConnectionServerManager connectionServerManager;
    protected ConnectionSubscriber connectionSubscriber;
    protected List<ConnectionFactory> connectionFactories;
    protected List<ConnectionSelector> connectionSelectors;
    protected List<MessageFactory> messageFactories;
    protected MessageCodecAdapter messageCodecAdapter;
    protected MessageRetryStrategyAdapter messageRetryStrategyAdapter;
    protected MessageIdempotentVerifier messageIdempotentVerifier;
    protected ScheduledExecutor scheduledExecutor;
    protected ConnectionLogger logger;
    protected ConnectionEventPublisher eventPublisher;
    private boolean initialized;
    private boolean destroyed;

    /* loaded from: input_file:com/github/linyuzai/connection/loadbalance/core/concept/AbstractConnectionLoadBalanceConcept$AbstractBuilder.class */
    public static abstract class AbstractBuilder<B extends AbstractBuilder<B, T>, T extends AbstractConnectionLoadBalanceConcept> {
        protected List<ConnectionRepositoryFactory> connectionRepositoryFactories = new ArrayList();
        protected List<ConnectionServerManagerFactory> connectionServerManagerFactories = new ArrayList();
        protected List<ConnectionSubscriberFactory> connectionSubscriberFactories = new ArrayList();
        protected List<ConnectionFactory> connectionFactories = new ArrayList();
        protected List<ConnectionSelector> connectionSelectors = new ArrayList();
        protected List<MessageFactory> messageFactories = new ArrayList();
        protected List<MessageCodecAdapter> messageCodecAdapters = new ArrayList();
        protected List<MessageRetryStrategyAdapter> messageRetryStrategyAdapters = new ArrayList();
        protected List<MessageIdempotentVerifierFactory> messageIdempotentVerifierFactories = new ArrayList();
        protected List<ScheduledExecutorFactory> scheduledExecutorFactories = new ArrayList();
        protected List<ConnectionLoggerFactory> loggerFactories = new ArrayList();
        protected List<ConnectionEventPublisherFactory> eventPublisherFactories = new ArrayList();
        protected List<ConnectionEventListener> eventListeners = new ArrayList();

        public B addConnectionRepositoryFactories(Collection<? extends ConnectionRepositoryFactory> collection) {
            this.connectionRepositoryFactories.addAll(collection);
            return this;
        }

        public B addConnectionServerManagerFactories(Collection<? extends ConnectionServerManagerFactory> collection) {
            this.connectionServerManagerFactories.addAll(collection);
            return this;
        }

        public B addConnectionSubscriberFactories(Collection<? extends ConnectionSubscriberFactory> collection) {
            this.connectionSubscriberFactories.addAll(collection);
            return this;
        }

        public B addConnectionFactories(Collection<? extends ConnectionFactory> collection) {
            this.connectionFactories.addAll(collection);
            return this;
        }

        public B addConnectionSelectors(Collection<? extends ConnectionSelector> collection) {
            this.connectionSelectors.addAll(collection);
            return this;
        }

        public B addMessageFactories(Collection<? extends MessageFactory> collection) {
            this.messageFactories.addAll(collection);
            return this;
        }

        public B addMessageCodecAdapters(Collection<? extends MessageCodecAdapter> collection) {
            this.messageCodecAdapters.addAll(collection);
            return this;
        }

        public B addMessageRetryStrategyAdapters(Collection<? extends MessageRetryStrategyAdapter> collection) {
            this.messageRetryStrategyAdapters.addAll(collection);
            return this;
        }

        public B addMessageIdempotentVerifierFactories(Collection<? extends MessageIdempotentVerifierFactory> collection) {
            this.messageIdempotentVerifierFactories.addAll(collection);
            return this;
        }

        public B addScheduledExecutorFactories(Collection<ScheduledExecutorFactory> collection) {
            this.scheduledExecutorFactories.addAll(collection);
            return this;
        }

        public B addLoggerFactories(Collection<ConnectionLoggerFactory> collection) {
            this.loggerFactories.addAll(collection);
            return this;
        }

        public B addEventPublisherFactories(Collection<? extends ConnectionEventPublisherFactory> collection) {
            this.eventPublisherFactories.addAll(collection);
            return this;
        }

        public B addEventListener(ConnectionEventListener connectionEventListener) {
            this.eventListeners.add(connectionEventListener);
            return this;
        }

        public B addEventListeners(Collection<ConnectionEventListener> collection) {
            this.eventListeners.addAll(collection);
            return this;
        }

        public T build() {
            this.connectionSelectors.add((ConnectionSelector) new AllSelector().addScopes(getScope()));
            this.messageFactories.add(new ObjectMessageFactory());
            this.messageCodecAdapters.add((MessageCodecAdapter) new BaseMessageCodecAdapter().addScopes(getScope()));
            this.eventListeners.add(1, new MessageForwardHandler());
            T create = create();
            create.setConnectionRepository(ConnectionRepository.Delegate.delegate(create, (ConnectionRepository) withScopeFactory(ConnectionRepository.class, this.connectionRepositoryFactories)));
            create.setConnectionServerManager(ConnectionServerManager.Delegate.delegate(create, (ConnectionServerManager) withScopeFactory(ConnectionServerManager.class, this.connectionServerManagerFactories)));
            create.setConnectionSubscriber(ConnectionSubscriber.Delegate.delegate(create, withConnectionSubscriberMasterSlave(withScope(this.connectionSubscriberFactories))));
            create.setConnectionFactories(ConnectionFactory.Delegate.delegate(create, withScope(this.connectionFactories)));
            create.setConnectionSelectors(ConnectionSelector.Delegate.delegate(create, withConnectionSelectorFilterChain(withScope(this.connectionSelectors))));
            create.setMessageFactories(MessageFactory.Delegate.delegate(create, withScope(this.messageFactories)));
            create.setMessageCodecAdapter(withMessageCodecAdapterChain(create, withScope(this.messageCodecAdapters)));
            create.setMessageRetryStrategyAdapter((MessageRetryStrategyAdapter) withScope(MessageRetryStrategyAdapter.class, this.messageRetryStrategyAdapters));
            create.setMessageIdempotentVerifier(MessageIdempotentVerifier.Delegate.delegate(create, (MessageIdempotentVerifier) withScopeFactory(MessageIdempotentVerifier.class, this.messageIdempotentVerifierFactories)));
            create.setScheduledExecutor(ScheduledExecutor.Delegate.delegate(create, (ScheduledExecutor) withScopeFactory(ScheduledExecutor.class, this.scheduledExecutorFactories)));
            create.setLogger(ConnectionLogger.Delegate.delegate(create, (ConnectionLogger) withScopeFactory(ConnectionLogger.class, this.loggerFactories)));
            ConnectionEventPublisher delegate = ConnectionEventPublisher.Delegate.delegate(create, (ConnectionEventPublisher) withScopeFactory(ConnectionEventPublisher.class, this.eventPublisherFactories));
            delegate.register(withScope(this.eventListeners));
            create.setEventPublisher(delegate);
            return create;
        }

        protected abstract T create();

        protected abstract String getScope();

        protected <S extends Scoped> List<S> withScope(Collection<S> collection) {
            return Scoped.filter(getScope(), collection);
        }

        protected <S extends Scoped> S withScope(Class<S> cls, Collection<S> collection) {
            return (S) Scoped.filter(getScope(), cls, collection);
        }

        protected <C, F extends ScopedFactory<C>> C withScopeFactory(Class<C> cls, Collection<F> collection) {
            return (C) ScopedFactory.create(getScope(), cls, collection);
        }

        protected ConnectionSubscriber withConnectionSubscriberMasterSlave(List<ConnectionSubscriberFactory> list) {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            for (ConnectionSubscriberFactory connectionSubscriberFactory : list) {
                if (connectionSubscriberFactory instanceof MasterSlaveConnectionSubscriberFactory) {
                    switch (((MasterSlaveConnectionSubscriberFactory) connectionSubscriberFactory).getMasterSlave()) {
                        case MASTER:
                            arrayList2.add(connectionSubscriberFactory);
                            break;
                        case SLAVE:
                            arrayList3.add(connectionSubscriberFactory);
                            break;
                    }
                } else {
                    arrayList.add(connectionSubscriberFactory);
                }
            }
            if (arrayList2.isEmpty()) {
                return (ConnectionSubscriber) withScopeFactory(ConnectionSubscriber.class, arrayList);
            }
            if (arrayList2.size() != 1) {
                throw new IllegalArgumentException("Master can only be one");
            }
            ConnectionSubscriber connectionSubscriber = (ConnectionSubscriber) withScopeFactory(ConnectionSubscriber.class, arrayList2);
            if (arrayList3.isEmpty()) {
                return new MasterFixedConnectionSubscriber(connectionSubscriber);
            }
            ConnectionSubscriber connectionSubscriber2 = (ConnectionSubscriber) withScopeFactory(ConnectionSubscriber.class, arrayList3);
            this.eventListeners.add(1, (ConnectionEventListener) new MasterSlaveAutoSwitcher().addScopes(getScope()));
            return new MasterSlaveSwitchableConnectionSubscriber(connectionSubscriber, connectionSubscriber2);
        }

        protected MessageCodecAdapter withMessageCodecAdapterChain(ConnectionLoadBalanceConcept connectionLoadBalanceConcept, List<MessageCodecAdapter> list) {
            Collections.reverse(list);
            return (MessageCodecAdapter) new MessageCodecAdapterChain(connectionLoadBalanceConcept, list).addScopes(getScope());
        }

        protected List<ConnectionSelector> withConnectionSelectorFilterChain(List<ConnectionSelector> list) {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            for (ConnectionSelector connectionSelector : list) {
                if ((connectionSelector instanceof FilterConnectionSelector) && ((FilterConnectionSelector) connectionSelector).asFilter()) {
                    arrayList2.add((FilterConnectionSelector) connectionSelector);
                } else {
                    arrayList.add(connectionSelector);
                }
            }
            arrayList.add((ConnectionSelector) new FilterConnectionSelectorChain(arrayList2).addScopes(getScope()));
            return arrayList;
        }
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public synchronized void initialize() {
        if (this.initialized) {
            return;
        }
        this.initialized = true;
        onInitialize();
        this.connectionSubscriber.subscribe();
        this.eventPublisher.publish(new ConnectionLoadBalanceConceptInitializeEvent(this));
    }

    protected void onInitialize() {
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public synchronized void destroy() {
        if (this.destroyed) {
            return;
        }
        this.destroyed = true;
        onDestroy();
        this.scheduledExecutor.shutdown();
        Iterator<String> it = this.connectionRepository.types().iterator();
        while (it.hasNext()) {
            Iterator<Connection> it2 = this.connectionRepository.select(it.next()).iterator();
            while (it2.hasNext()) {
                it2.next().close(Connection.Close.SERVER_STOP);
            }
        }
        this.eventPublisher.publish(new ConnectionLoadBalanceConceptDestroyEvent(this));
    }

    protected void onDestroy() {
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public Connection createConnection(Object obj, Map<Object, Object> map) {
        ConnectionFactory connectionFactory = getConnectionFactory(obj, map);
        if (connectionFactory == null) {
            throw new ConnectionLoadBalanceException("No ConnectionFactory available with " + obj);
        }
        Connection create = connectionFactory.create(obj, map);
        if (create == null) {
            throw new ConnectionLoadBalanceException("Connection can not be created with " + obj);
        }
        return create;
    }

    public ConnectionFactory getConnectionFactory(Object obj, Map<Object, Object> map) {
        for (ConnectionFactory connectionFactory : this.connectionFactories) {
            if (connectionFactory.support(obj, map)) {
                return connectionFactory;
            }
        }
        return null;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public Connection onEstablish(Object obj, Map<Object, Object> map) {
        Connection createConnection = createConnection(obj, map);
        onEstablish(createConnection);
        return createConnection;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void onEstablish(Connection connection) {
        String type = connection.getType();
        connection.setConcept(this);
        MessageEncoder computeIfAbsent = this.messageEncoderMap.computeIfAbsent(type, str -> {
            return this.messageCodecAdapter.getMessageEncoder(str, null);
        });
        MessageDecoder computeIfAbsent2 = this.messageDecoderMap.computeIfAbsent(type, str2 -> {
            return this.messageCodecAdapter.getMessageDecoder(str2, null);
        });
        MessageRetryStrategy computeIfAbsent3 = this.messageRetryStrategyMap.computeIfAbsent(type, str3 -> {
            return MessageRetryStrategy.Delegate.delegate(this, this.messageRetryStrategyAdapter.getMessageRetryStrategy(str3));
        });
        connection.setMessageEncoder(computeIfAbsent);
        connection.setMessageDecoder(computeIfAbsent2);
        connection.setMessageRetryStrategy(computeIfAbsent3);
        connection.initialize();
        this.connectionRepository.add(connection);
        this.eventPublisher.publish(new ConnectionEstablishEvent(connection));
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void onClose(Object obj, String str, Object obj2) {
        Connection connection = this.connectionRepository.get(obj, str);
        if (connection == null) {
            this.eventPublisher.publish(new UnknownCloseEvent(obj, str, obj2, this));
        } else {
            onClose(connection, obj2);
        }
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void onClose(@NonNull Connection connection, Object obj) {
        if (connection == null) {
            throw new NullPointerException("connection is marked non-null but is null");
        }
        Connection remove = this.connectionRepository.remove(connection);
        if (remove != null) {
            this.eventPublisher.publish(new ConnectionCloseEvent(remove, obj));
        }
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void onMessage(Object obj, String str, Object obj2) {
        Connection connection = this.connectionRepository.get(obj, str);
        if (connection == null) {
            this.eventPublisher.publish(new UnknownMessageEvent(obj, str, obj2, this));
        } else {
            onMessage(connection, obj2);
        }
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void onMessage(@NonNull Connection connection, Object obj) {
        if (connection == null) {
            throw new NullPointerException("connection is marked non-null but is null");
        }
        onMessage(connection, obj, message -> {
            return true;
        });
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void onMessage(Connection connection, Object obj, Predicate<Message> predicate) {
        try {
            Message decode = connection.getMessageDecoder().decode(obj, connection);
            try {
                if (predicate.test(decode)) {
                    this.eventPublisher.publish(new MessageReceiveEvent(connection, decode));
                } else {
                    this.eventPublisher.publish(new MessageDiscardEvent(connection, decode));
                }
            } catch (Throwable th) {
                this.eventPublisher.publish(new MessageReceivePredicateErrorEvent(connection, decode, th));
            }
        } catch (Throwable th2) {
            this.eventPublisher.publish(new MessageDecodeErrorEvent(connection, th2));
        }
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void onError(Object obj, String str, Throwable th) {
        Connection connection = this.connectionRepository.get(obj, str);
        if (connection == null) {
            this.eventPublisher.publish(new UnknownErrorEvent(obj, str, th, this));
        } else {
            onError(connection, th);
        }
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void onError(@NonNull Connection connection, Throwable th) {
        if (connection == null) {
            throw new NullPointerException("connection is marked non-null but is null");
        }
        this.eventPublisher.publish(new ConnectionErrorEvent(connection, th));
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public Message createMessage(Object obj) {
        if (obj instanceof Message) {
            return (Message) obj;
        }
        MessageFactory messageFactory = getMessageFactory(obj);
        if (messageFactory == null) {
            throw new ConnectionLoadBalanceException("No MessageFactory available with " + obj);
        }
        Message create = messageFactory.create(obj);
        if (create == null) {
            throw new ConnectionLoadBalanceException("Message can not be created with " + obj);
        }
        return create;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public Message createMessage(Object obj, Map<String, String> map) {
        Message createMessage = createMessage(obj);
        if (map != null) {
            createMessage.getHeaders().putAll(map);
        }
        return createMessage;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void send(Object obj) {
        Message createMessage = createMessage(obj);
        doSend(createMessage, getConnectionSelector(createMessage));
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void send(Object obj, Map<String, String> map) {
        Message createMessage = createMessage(obj, map);
        doSend(createMessage, getConnectionSelector(createMessage));
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void send(Object obj, ConnectionSelector connectionSelector) {
        doSend(createMessage(obj), connectionSelector);
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public void send(Object obj, Map<String, String> map, ConnectionSelector connectionSelector) {
        doSend(createMessage(obj, map), connectionSelector);
    }

    protected void doSend(Message message, ConnectionSelector connectionSelector) {
        if (connectionSelector == null) {
            throw new IllegalArgumentException("No connection selector adapted for message: " + message);
        }
        initMessage(message);
        Collection<Connection> select = connectionSelector.select(message);
        message.setForward(false);
        if (select == null || select.isEmpty()) {
            this.eventPublisher.publish(new DeadMessageEvent(message));
            return;
        }
        this.eventPublisher.publish(new MessagePrepareEvent(message, select));
        for (Connection connection : select) {
            try {
                connection.send(message);
            } catch (Throwable th) {
                this.eventPublisher.publish(new MessageSendErrorEvent(connection, message, th));
            }
        }
        this.eventPublisher.publish(new MessageSendEvent(message, select));
    }

    protected void initMessage(Message message) {
        message.setId(this.messageIdempotentVerifier.generateMessageId(message));
        message.setFrom(ConnectionServer.url(this.connectionServerManager.getLocal()));
    }

    public MessageFactory getMessageFactory(Object obj) {
        for (MessageFactory messageFactory : this.messageFactories) {
            if (messageFactory.support(obj)) {
                return messageFactory;
            }
        }
        return null;
    }

    public ConnectionSelector getConnectionSelector(Message message) {
        for (ConnectionSelector connectionSelector : this.connectionSelectors) {
            if (connectionSelector.support(message)) {
                return connectionSelector;
            }
        }
        return null;
    }

    public Map<String, MessageEncoder> getMessageEncoderMap() {
        return this.messageEncoderMap;
    }

    public Map<String, MessageDecoder> getMessageDecoderMap() {
        return this.messageDecoderMap;
    }

    public Map<String, MessageRetryStrategy> getMessageRetryStrategyMap() {
        return this.messageRetryStrategyMap;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public ConnectionRepository getConnectionRepository() {
        return this.connectionRepository;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public ConnectionServerManager getConnectionServerManager() {
        return this.connectionServerManager;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public ConnectionSubscriber getConnectionSubscriber() {
        return this.connectionSubscriber;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public List<ConnectionFactory> getConnectionFactories() {
        return this.connectionFactories;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public List<ConnectionSelector> getConnectionSelectors() {
        return this.connectionSelectors;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public List<MessageFactory> getMessageFactories() {
        return this.messageFactories;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public MessageCodecAdapter getMessageCodecAdapter() {
        return this.messageCodecAdapter;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public MessageRetryStrategyAdapter getMessageRetryStrategyAdapter() {
        return this.messageRetryStrategyAdapter;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public MessageIdempotentVerifier getMessageIdempotentVerifier() {
        return this.messageIdempotentVerifier;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public ScheduledExecutor getScheduledExecutor() {
        return this.scheduledExecutor;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public ConnectionLogger getLogger() {
        return this.logger;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept
    public ConnectionEventPublisher getEventPublisher() {
        return this.eventPublisher;
    }

    public boolean isInitialized() {
        return this.initialized;
    }

    public boolean isDestroyed() {
        return this.destroyed;
    }

    public void setConnectionRepository(ConnectionRepository connectionRepository) {
        this.connectionRepository = connectionRepository;
    }

    public void setConnectionServerManager(ConnectionServerManager connectionServerManager) {
        this.connectionServerManager = connectionServerManager;
    }

    public void setConnectionSubscriber(ConnectionSubscriber connectionSubscriber) {
        this.connectionSubscriber = connectionSubscriber;
    }

    public void setConnectionFactories(List<ConnectionFactory> list) {
        this.connectionFactories = list;
    }

    public void setConnectionSelectors(List<ConnectionSelector> list) {
        this.connectionSelectors = list;
    }

    public void setMessageFactories(List<MessageFactory> list) {
        this.messageFactories = list;
    }

    public void setMessageCodecAdapter(MessageCodecAdapter messageCodecAdapter) {
        this.messageCodecAdapter = messageCodecAdapter;
    }

    public void setMessageRetryStrategyAdapter(MessageRetryStrategyAdapter messageRetryStrategyAdapter) {
        this.messageRetryStrategyAdapter = messageRetryStrategyAdapter;
    }

    public void setMessageIdempotentVerifier(MessageIdempotentVerifier messageIdempotentVerifier) {
        this.messageIdempotentVerifier = messageIdempotentVerifier;
    }

    public void setScheduledExecutor(ScheduledExecutor scheduledExecutor) {
        this.scheduledExecutor = scheduledExecutor;
    }

    public void setLogger(ConnectionLogger connectionLogger) {
        this.logger = connectionLogger;
    }

    public void setEventPublisher(ConnectionEventPublisher connectionEventPublisher) {
        this.eventPublisher = connectionEventPublisher;
    }

    public void setInitialized(boolean z) {
        this.initialized = z;
    }

    public void setDestroyed(boolean z) {
        this.destroyed = z;
    }
}
