package com.github.linyuzai.connection.loadbalance.core.subscribe;

import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept;
import com.github.linyuzai.connection.loadbalance.core.message.idempotent.MessageIdempotentVerifier;
import com.github.linyuzai.connection.loadbalance.core.server.ConnectionServer;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

/* loaded from: input_file:com/github/linyuzai/connection/loadbalance/core/subscribe/AbstractConnectionSubscriber.class */
public abstract class AbstractConnectionSubscriber implements ConnectionSubscriber {
    public static final String DELIMITER = "_";
    public static final String PREFIX = "LBConnection";

    @Override // com.github.linyuzai.connection.loadbalance.core.subscribe.ConnectionSubscriber
    public void subscribe(Consumer<Connection> consumer, Consumer<Throwable> consumer2, Runnable runnable, ConnectionLoadBalanceConcept connectionLoadBalanceConcept) {
        if (connectionLoadBalanceConcept.getConnectionServerManager().getLocal() == null) {
            runnable.run();
            return;
        }
        ConnectionServer subscribeServer = getSubscribeServer();
        try {
            try {
                String topic = getTopic(connectionLoadBalanceConcept);
                String id = getId(topic, getFrom(connectionLoadBalanceConcept), subscribeServer);
                Connection connection = connectionLoadBalanceConcept.getConnectionRepository().get(id, Connection.Type.SUBSCRIBER);
                if (connection != null) {
                    if (connection.isAlive()) {
                        runnable.run();
                        return;
                    }
                    connection.close(Connection.Close.NOT_ALIVE);
                }
                LinkedHashMap linkedHashMap = new LinkedHashMap();
                Connection createSubscriber = createSubscriber(id, topic, linkedHashMap, connectionLoadBalanceConcept);
                if (createSubscriber != null) {
                    createSubscriber.getMetadata().put(ConnectionServer.class, subscribeServer);
                    consumer.accept(createSubscriber);
                }
                Connection connection2 = connectionLoadBalanceConcept.getConnectionRepository().get(id, Connection.Type.OBSERVABLE);
                if (connection2 != null) {
                    if (connection2.isAlive()) {
                        runnable.run();
                        return;
                    }
                    connection2.close(Connection.Close.NOT_ALIVE);
                }
                Connection createObservable = createObservable(id, topic, linkedHashMap, connectionLoadBalanceConcept);
                if (createObservable != null) {
                    consumer.accept(createObservable);
                }
                runnable.run();
            } catch (Throwable th) {
                consumer2.accept(new ConnectionServerSubscribeException(subscribeServer, th.getMessage(), th));
                runnable.run();
            }
        } catch (Throwable th2) {
            runnable.run();
            throw th2;
        }
    }

    protected String getId(String str, String str2, ConnectionServer connectionServer) {
        return str + DELIMITER + str2 + DELIMITER + connectionServer.getServiceId();
    }

    protected String getFrom(ConnectionLoadBalanceConcept connectionLoadBalanceConcept) {
        return ConnectionServer.url(connectionLoadBalanceConcept.getConnectionServerManager().getLocal());
    }

    protected String getTopic(ConnectionLoadBalanceConcept connectionLoadBalanceConcept) {
        return "LBConnection_" + connectionLoadBalanceConcept.getId() + DELIMITER + connectionLoadBalanceConcept.getConnectionServerManager().getLocal().getServiceId();
    }

    protected void onMessageReceived(Connection connection, Object obj) {
        connection.getConcept().onMessage(connection, obj, message -> {
            ConnectionLoadBalanceConcept concept = connection.getConcept();
            return !Objects.equals(getFrom(concept), message.getFrom()) && getMessageIdempotentVerifier(concept).verify(message);
        });
    }

    protected MessageIdempotentVerifier getMessageIdempotentVerifier(ConnectionLoadBalanceConcept connectionLoadBalanceConcept) {
        return connectionLoadBalanceConcept.getMessageIdempotentVerifier();
    }

    protected abstract Connection createSubscriber(String str, String str2, Map<Object, Object> map, ConnectionLoadBalanceConcept connectionLoadBalanceConcept);

    protected abstract Connection createObservable(String str, String str2, Map<Object, Object> map, ConnectionLoadBalanceConcept connectionLoadBalanceConcept);

    protected abstract ConnectionServer getSubscribeServer();
}
