package com.github.linyuzai.connection.loadbalance.core.subscribe;

import com.github.linyuzai.connection.loadbalance.core.concept.Connection;
import com.github.linyuzai.connection.loadbalance.core.concept.ConnectionLoadBalanceConcept;
import com.github.linyuzai.connection.loadbalance.core.server.ConnectionServer;
import java.net.URI;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

/* loaded from: input_file:com/github/linyuzai/connection/loadbalance/core/subscribe/ServerInstanceConnectionSubscriber.class */
public abstract class ServerInstanceConnectionSubscriber<T extends Connection> implements ConnectionSubscriber {
    @Override // com.github.linyuzai.connection.loadbalance.core.subscribe.ConnectionSubscriber
    public void subscribe(Consumer<Connection> consumer, Consumer<Throwable> consumer2, Runnable runnable, ConnectionLoadBalanceConcept connectionLoadBalanceConcept) {
        subscribe(0, connectionLoadBalanceConcept.getConnectionServerManager().getConnectionServers(), consumer, consumer2, runnable, connectionLoadBalanceConcept);
    }

    protected void subscribe(int i, List<ConnectionServer> list, Consumer<Connection> consumer, Consumer<Throwable> consumer2, Runnable runnable, ConnectionLoadBalanceConcept connectionLoadBalanceConcept) {
        if (i >= list.size()) {
            runnable.run();
        } else {
            subscribe(consumer, consumer2, () -> {
                subscribe(i + 1, list, consumer, consumer2, runnable, connectionLoadBalanceConcept);
            }, list.get(i), connectionLoadBalanceConcept);
        }
    }

    public void subscribe(ConnectionServer connectionServer, ConnectionLoadBalanceConcept connectionLoadBalanceConcept) {
        subscribe(ConnectionSubscriber.onSubscribeSuccess(connectionLoadBalanceConcept), ConnectionSubscriber.onSubscribeError(connectionLoadBalanceConcept), () -> {
        }, connectionServer, connectionLoadBalanceConcept);
    }

    public void subscribe(Consumer<Connection> consumer, Consumer<Throwable> consumer2, Runnable runnable, ConnectionServer connectionServer, ConnectionLoadBalanceConcept connectionLoadBalanceConcept) {
        Connection subscriberConnection = getSubscriberConnection(connectionServer, connectionLoadBalanceConcept);
        if (subscriberConnection != null) {
            if (subscriberConnection.isAlive()) {
                runnable.run();
                return;
            }
            subscriberConnection.close(Connection.Close.NOT_ALIVE);
        }
        if (interceptSubscribe(connectionServer, connectionLoadBalanceConcept)) {
            runnable.run();
        } else {
            doSubscribe(connectionServer, connectionLoadBalanceConcept, connection -> {
                consumer.accept(connection);
                ConnectionServer local = connectionLoadBalanceConcept.getConnectionServerManager().getLocal();
                if (local != null) {
                    connection.send(connectionLoadBalanceConcept.createMessage(local));
                }
            }, th -> {
                consumer2.accept(new ConnectionServerSubscribeException(connectionServer, th.getMessage(), th));
            }, runnable);
        }
    }

    public boolean interceptSubscribe(ConnectionServer connectionServer, ConnectionLoadBalanceConcept connectionLoadBalanceConcept) {
        return false;
    }

    public Connection getSubscriberConnection(ConnectionServer connectionServer, ConnectionLoadBalanceConcept connectionLoadBalanceConcept) {
        if (connectionServer == null) {
            return null;
        }
        for (Connection connection : connectionLoadBalanceConcept.getConnectionRepository().select(Connection.Type.SUBSCRIBER)) {
            if (connectionLoadBalanceConcept.getConnectionServerManager().isEqual(connectionServer, (ConnectionServer) connection.getMetadata().get(ConnectionServer.class))) {
                return connection;
            }
        }
        return null;
    }

    public void doSubscribe(ConnectionServer connectionServer, ConnectionLoadBalanceConcept connectionLoadBalanceConcept, Consumer<T> consumer, Consumer<Throwable> consumer2, Runnable runnable) {
        doSubscribe(getUri(connectionServer, connectionLoadBalanceConcept), connectionLoadBalanceConcept, connection -> {
            connection.getMetadata().put(ConnectionServer.class, connectionServer);
            consumer.accept(connection);
        }, consumer2, runnable);
    }

    public abstract void doSubscribe(URI uri, ConnectionLoadBalanceConcept connectionLoadBalanceConcept, Consumer<T> consumer, Consumer<Throwable> consumer2, Runnable runnable);

    public URI getUri(ConnectionServer connectionServer, ConnectionLoadBalanceConcept connectionLoadBalanceConcept) {
        Map<String, String> params = getParams(connectionLoadBalanceConcept);
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, String> entry : params.entrySet()) {
            arrayList.add(URLEncoder.encode(entry.getKey(), "UTF-8") + "=" + URLEncoder.encode(entry.getValue(), "UTF-8"));
        }
        return URI.create(getUri(connectionServer).toString() + (arrayList.isEmpty() ? "" : "?" + String.join("&", arrayList)));
    }

    public URI getUri(ConnectionServer connectionServer) {
        return URI.create(getProtocol() + "://" + getHost(connectionServer) + ":" + getPort(connectionServer) + getEndpoint());
    }

    public Map<String, String> getParams(ConnectionLoadBalanceConcept connectionLoadBalanceConcept) {
        return Collections.emptyMap();
    }

    public String getHost(ConnectionServer connectionServer) {
        String str = connectionServer.getMetadata().get(getHostKey());
        return (str == null || str.isEmpty()) ? connectionServer.getHost() : str;
    }

    public int getPort(ConnectionServer connectionServer) {
        String str = connectionServer.getMetadata().get(getPortKey());
        return (str == null || str.isEmpty()) ? connectionServer.getPort() : Integer.parseInt(str);
    }

    public String getHostKey() {
        return "concept-connection-host";
    }

    public String getPortKey() {
        return "concept-connection-port";
    }

    public abstract String getProtocol();

    public abstract String getEndpoint();
}
