package com.github.linyuzai.connection.loadbalance.core.concept;

import com.github.linyuzai.connection.loadbalance.core.event.ConnectionCloseErrorEvent;
import com.github.linyuzai.connection.loadbalance.core.message.Message;
import com.github.linyuzai.connection.loadbalance.core.message.MessageSendErrorEvent;
import com.github.linyuzai.connection.loadbalance.core.message.MessageSendInterceptor;
import com.github.linyuzai.connection.loadbalance.core.message.MessageSendSuccessEvent;
import com.github.linyuzai.connection.loadbalance.core.message.MessageTransportException;
import com.github.linyuzai.connection.loadbalance.core.message.PingMessage;
import com.github.linyuzai.connection.loadbalance.core.message.PongMessage;
import com.github.linyuzai.connection.loadbalance.core.message.decode.MessageDecoder;
import com.github.linyuzai.connection.loadbalance.core.message.encode.MessageEncodeException;
import com.github.linyuzai.connection.loadbalance.core.message.encode.MessageEncoder;
import com.github.linyuzai.connection.loadbalance.core.message.retry.MessageRetryStrategy;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import lombok.NonNull;

/* loaded from: input_file:com/github/linyuzai/connection/loadbalance/core/concept/AbstractConnection.class */
public abstract class AbstractConnection implements Connection {

    @NonNull
    protected String type;

    @NonNull
    protected MessageEncoder messageEncoder;

    @NonNull
    protected MessageDecoder messageDecoder;

    @NonNull
    protected MessageRetryStrategy messageRetryStrategy;

    @NonNull
    protected ConnectionLoadBalanceConcept concept;
    protected volatile boolean closed;

    @NonNull
    protected Map<Object, Object> metadata = new ConcurrentHashMap();

    @NonNull
    protected List<MessageSendInterceptor> messageSendInterceptors = new CopyOnWriteArrayList();
    protected volatile boolean alive = true;
    protected volatile long lastHeartbeat = System.currentTimeMillis();

    public void addMetadata(Map<?, ?> map) {
        if (map == null) {
            return;
        }
        this.metadata.putAll(map);
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    public void send(@NonNull Message message) {
        if (message == null) {
            throw new NullPointerException("message is marked non-null but is null");
        }
        send(message, () -> {
            this.concept.getEventPublisher().publish(new MessageSendSuccessEvent(this, message));
        }, th -> {
            this.concept.getEventPublisher().publish(new MessageSendErrorEvent(this, message, th));
        }, () -> {
        });
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    public void send(@NonNull Message message, Runnable runnable, Consumer<Throwable> consumer, Runnable runnable2) {
        if (message == null) {
            throw new NullPointerException("message is marked non-null but is null");
        }
        if (isClosed()) {
            consumer.accept(new IllegalStateException("Connection is closed"));
            runnable2.run();
            return;
        }
        if (message instanceof PingMessage) {
            ping((PingMessage) message, runnable, consumer, runnable2);
            return;
        }
        if (message instanceof PongMessage) {
            pong((PongMessage) message, runnable, consumer, runnable2);
            return;
        }
        if (!this.messageSendInterceptors.isEmpty()) {
            try {
                Iterator<MessageSendInterceptor> it = this.messageSendInterceptors.iterator();
                while (it.hasNext()) {
                    if (it.next().intercept(message, this)) {
                        return;
                    }
                }
            } catch (Throwable th) {
                consumer.accept(th);
                runnable2.run();
                return;
            }
        }
        try {
            Object encode = getMessageEncoder().encode(message);
            Consumer consumer2 = consumer3 -> {
                doSend(encode, runnable, consumer3, runnable2);
            };
            consumer2.accept(th2 -> {
                if (th2 instanceof MessageTransportException) {
                    this.messageRetryStrategy.retry(th2, consumer2, consumer);
                } else {
                    consumer.accept(th2);
                }
            });
        } catch (Throwable th3) {
            consumer.accept(new MessageEncodeException(message, th3));
            runnable2.run();
        }
    }

    public void ping(PingMessage pingMessage) {
        ping(pingMessage, onMessageSuccess(pingMessage), onMessageError(pingMessage), onMessageComplete());
    }

    public void ping(PingMessage pingMessage, Runnable runnable, Consumer<Throwable> consumer, Runnable runnable2) {
        doPing(pingMessage, runnable, consumer, runnable2);
    }

    public void pong(PongMessage pongMessage) {
        pong(pongMessage, onMessageSuccess(pongMessage), onMessageError(pongMessage), onMessageComplete());
    }

    public void pong(PongMessage pongMessage, Runnable runnable, Consumer<Throwable> consumer, Runnable runnable2) {
        doPong(pongMessage, runnable, consumer, runnable2);
    }

    protected Runnable onMessageSuccess(Message message) {
        return () -> {
            this.concept.getEventPublisher().publish(new MessageSendSuccessEvent(this, message));
        };
    }

    protected Consumer<Throwable> onMessageError(Message message) {
        return th -> {
            this.concept.getEventPublisher().publish(new MessageSendErrorEvent(this, message, th));
        };
    }

    protected Runnable onMessageComplete() {
        return () -> {
        };
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    public void close() {
        close(null);
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    public void close(Runnable runnable, Consumer<Throwable> consumer, Runnable runnable2) {
        close(null, runnable, consumer, runnable2);
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    public void close(Object obj) {
        close(obj, () -> {
        }, th -> {
            this.concept.getEventPublisher().publish(new ConnectionCloseErrorEvent(this, obj, th));
        }, () -> {
        });
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    public void close(Object obj, Runnable runnable, Consumer<Throwable> consumer, Runnable runnable2) {
        if (this.closed) {
            return;
        }
        this.closed = true;
        doClose(obj, runnable, consumer, runnable2);
        this.concept.onClose(this, obj);
    }

    public abstract void doSend(Object obj, Runnable runnable, Consumer<Throwable> consumer, Runnable runnable2);

    public abstract void doPing(PingMessage pingMessage, Runnable runnable, Consumer<Throwable> consumer, Runnable runnable2);

    public abstract void doPong(PongMessage pongMessage, Runnable runnable, Consumer<Throwable> consumer, Runnable runnable2);

    public abstract void doClose(Object obj, Runnable runnable, Consumer<Throwable> consumer, Runnable runnable2);

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    @NonNull
    public String getType() {
        return this.type;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    @NonNull
    public Map<Object, Object> getMetadata() {
        return this.metadata;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    @NonNull
    public MessageEncoder getMessageEncoder() {
        return this.messageEncoder;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    @NonNull
    public MessageDecoder getMessageDecoder() {
        return this.messageDecoder;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    @NonNull
    public MessageRetryStrategy getMessageRetryStrategy() {
        return this.messageRetryStrategy;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    @NonNull
    public List<MessageSendInterceptor> getMessageSendInterceptors() {
        return this.messageSendInterceptors;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    @NonNull
    public ConnectionLoadBalanceConcept getConcept() {
        return this.concept;
    }

    public boolean isClosed() {
        return this.closed;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    public boolean isAlive() {
        return this.alive;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    public long getLastHeartbeat() {
        return this.lastHeartbeat;
    }

    public void setType(@NonNull String str) {
        if (str == null) {
            throw new NullPointerException("type is marked non-null but is null");
        }
        this.type = str;
    }

    public void setMetadata(@NonNull Map<Object, Object> map) {
        if (map == null) {
            throw new NullPointerException("metadata is marked non-null but is null");
        }
        this.metadata = map;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    public void setMessageEncoder(@NonNull MessageEncoder messageEncoder) {
        if (messageEncoder == null) {
            throw new NullPointerException("messageEncoder is marked non-null but is null");
        }
        this.messageEncoder = messageEncoder;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    public void setMessageDecoder(@NonNull MessageDecoder messageDecoder) {
        if (messageDecoder == null) {
            throw new NullPointerException("messageDecoder is marked non-null but is null");
        }
        this.messageDecoder = messageDecoder;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    public void setMessageRetryStrategy(@NonNull MessageRetryStrategy messageRetryStrategy) {
        if (messageRetryStrategy == null) {
            throw new NullPointerException("messageRetryStrategy is marked non-null but is null");
        }
        this.messageRetryStrategy = messageRetryStrategy;
    }

    public void setMessageSendInterceptors(@NonNull List<MessageSendInterceptor> list) {
        if (list == null) {
            throw new NullPointerException("messageSendInterceptors is marked non-null but is null");
        }
        this.messageSendInterceptors = list;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    public void setConcept(@NonNull ConnectionLoadBalanceConcept connectionLoadBalanceConcept) {
        if (connectionLoadBalanceConcept == null) {
            throw new NullPointerException("concept is marked non-null but is null");
        }
        this.concept = connectionLoadBalanceConcept;
    }

    public void setClosed(boolean z) {
        this.closed = z;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    public void setAlive(boolean z) {
        this.alive = z;
    }

    @Override // com.github.linyuzai.connection.loadbalance.core.concept.Connection
    public void setLastHeartbeat(long j) {
        this.lastHeartbeat = j;
    }
}
