package org.joyqueue.network.transport.support;

import java.net.SocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import org.joyqueue.network.event.TransportEvent;
import org.joyqueue.network.transport.ChannelTransport;
import org.joyqueue.network.transport.TransportAttribute;
import org.joyqueue.network.transport.TransportClient;
import org.joyqueue.network.transport.TransportState;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.CommandCallback;
import org.joyqueue.network.transport.config.TransportConfig;
import org.joyqueue.network.transport.exception.TransportException;
import org.joyqueue.shaded.io.netty.channel.Channel;
import org.joyqueue.toolkit.concurrent.EventBus;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/network/transport/support/FailoverGroupChannelTransport.class */
public class FailoverGroupChannelTransport implements ChannelTransport {
    protected static final Logger logger = LoggerFactory.getLogger(FailoverGroupChannelTransport.class);
    private List<SocketAddress> addresses;
    private long connectionTimeout;
    private TransportClient transportClient;
    private TransportConfig config;
    private EventBus<TransportEvent> transportEventBus;
    private volatile int roundrobinIndex = 0;
    private ConcurrentMap<SocketAddress, ChannelTransportEntry> transports = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/joyqueue/network/transport/support/FailoverGroupChannelTransport$ChannelTransportEntry.class */
    public class ChannelTransportEntry {
        private ChannelTransport transport;
        private volatile long lastConnect;

        ChannelTransportEntry() {
        }

        ChannelTransportEntry(long j) {
            this.lastConnect = j;
        }

        ChannelTransportEntry(ChannelTransport channelTransport, long j) {
            this.transport = channelTransport;
            this.lastConnect = j;
        }

        public ChannelTransport getTransport() {
            return this.transport;
        }

        public void setTransport(ChannelTransport channelTransport) {
            this.transport = channelTransport;
        }

        public long getLastConnect() {
            return this.lastConnect;
        }

        public void setLastConnect(long j) {
            this.lastConnect = j;
        }
    }

    public FailoverGroupChannelTransport(List<SocketAddress> list, long j, TransportClient transportClient, TransportConfig transportConfig, EventBus<TransportEvent> eventBus) {
        this.addresses = list;
        this.connectionTimeout = j;
        this.transportClient = transportClient;
        this.config = transportConfig;
        this.transportEventBus = eventBus;
        init();
    }

    @Override // org.joyqueue.network.transport.Transport
    public Command sync(Command command) throws TransportException {
        return sync(command, 0L);
    }

    @Override // org.joyqueue.network.transport.Transport
    public void async(Command command, CommandCallback commandCallback) throws TransportException {
        async(command, 0L, commandCallback);
    }

    @Override // org.joyqueue.network.transport.Transport
    public CompletableFuture<?> async(Command command) throws TransportException {
        return async(command, 0L);
    }

    @Override // org.joyqueue.network.transport.Transport
    public void oneway(Command command) throws TransportException {
        oneway(command, 0L);
    }

    @Override // org.joyqueue.network.transport.Transport
    public void acknowledge(Command command, Command command2) throws TransportException {
        acknowledge(command, command2, null);
    }

    @Override // org.joyqueue.network.transport.Transport
    public Command sync(Command command, long j) throws TransportException {
        return (Command) execute(channelTransport -> {
            return channelTransport.sync(command, j);
        });
    }

    @Override // org.joyqueue.network.transport.Transport
    public void async(Command command, long j, CommandCallback commandCallback) throws TransportException {
        execute(channelTransport -> {
            channelTransport.async(command, j, commandCallback);
            return null;
        });
    }

    @Override // org.joyqueue.network.transport.Transport
    public CompletableFuture<?> async(Command command, long j) throws TransportException {
        return (CompletableFuture) execute(channelTransport -> {
            return channelTransport.async(command, j);
        });
    }

    @Override // org.joyqueue.network.transport.Transport
    public void oneway(Command command, long j) throws TransportException {
        execute(channelTransport -> {
            channelTransport.oneway(command, j);
            return null;
        });
    }

    @Override // org.joyqueue.network.transport.Transport
    public void acknowledge(Command command, Command command2, CommandCallback commandCallback) throws TransportException {
        throw new UnsupportedOperationException();
    }

    @Override // org.joyqueue.network.transport.Transport
    public SocketAddress remoteAddress() {
        return (SocketAddress) execute(channelTransport -> {
            return channelTransport.remoteAddress();
        });
    }

    @Override // org.joyqueue.network.transport.Transport
    public TransportAttribute attr() {
        return (TransportAttribute) execute(channelTransport -> {
            return channelTransport.attr();
        });
    }

    @Override // org.joyqueue.network.transport.Transport
    public void attr(TransportAttribute transportAttribute) {
        execute(channelTransport -> {
            channelTransport.attr(transportAttribute);
            return null;
        });
    }

    @Override // org.joyqueue.network.transport.Transport
    public TransportState state() {
        Iterator<Map.Entry<SocketAddress, ChannelTransportEntry>> it = this.transports.entrySet().iterator();
        while (it.hasNext()) {
            ChannelTransport transport = it.next().getValue().getTransport();
            if (transport != null && transport.state().equals(TransportState.CONNECTED)) {
                return TransportState.CONNECTED;
            }
        }
        return TransportState.DISCONNECTED;
    }

    @Override // org.joyqueue.network.transport.Transport
    public void stop() {
        Iterator<Map.Entry<SocketAddress, ChannelTransportEntry>> it = this.transports.entrySet().iterator();
        while (it.hasNext()) {
            ChannelTransport transport = it.next().getValue().getTransport();
            if (transport != null) {
                transport.stop();
            }
        }
    }

    @Override // org.joyqueue.network.transport.ChannelTransport
    public Channel getChannel() {
        return (Channel) execute(channelTransport -> {
            return channelTransport.getChannel();
        });
    }

    protected void init() {
        for (SocketAddress socketAddress : this.addresses) {
            try {
                getOrCreateTransport(socketAddress);
                return;
            } catch (TransportException e) {
                logger.warn("create transport exception, address: {}", socketAddress, e);
                this.roundrobinIndex++;
            }
        }
        throw new TransportException.ConnectionException();
    }

    protected <T> T execute(Function<ChannelTransport, T> function) throws TransportException {
        int size = this.addresses.size();
        int i = this.roundrobinIndex;
        for (int i2 = 0; i2 < size; i2++) {
            if (i >= size) {
                i = 0;
            }
            SocketAddress socketAddress = this.addresses.get(i);
            try {
                T apply = function.apply(getOrCreateTransport(socketAddress));
                this.roundrobinIndex = i;
                return apply;
            } catch (TransportException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("execute exception, address: {}", socketAddress, e);
                }
                i++;
            }
        }
        throw new TransportException.RequestErrorException();
    }

    /* JADX WARN: Finally extract failed */
    protected ChannelTransport getOrCreateTransport(SocketAddress socketAddress) throws TransportException {
        ChannelTransportEntry channelTransportEntry = this.transports.get(socketAddress);
        long now = SystemClock.now();
        if (channelTransportEntry != null) {
            if (channelTransportEntry.getTransport() != null) {
                return channelTransportEntry.getTransport();
            }
            if (now - channelTransportEntry.getLastConnect() < this.config.getRetryDelay()) {
                throw new TransportException.ConnectionException(socketAddress.toString());
            }
        }
        synchronized (this.transports) {
            ChannelTransportEntry channelTransportEntry2 = this.transports.get(socketAddress);
            if (channelTransportEntry2 == null) {
                channelTransportEntry2 = new ChannelTransportEntry();
            } else {
                if (channelTransportEntry2.getTransport() != null) {
                    return channelTransportEntry2.getTransport();
                }
                if (now - channelTransportEntry2.getLastConnect() < this.config.getRetryDelay()) {
                    throw new TransportException.ConnectionException(socketAddress.toString());
                }
            }
            try {
                try {
                    ChannelTransport createTransport = createTransport(socketAddress);
                    channelTransportEntry2.setTransport(createTransport);
                    if (logger.isDebugEnabled()) {
                        logger.debug("create transport, address: {}", socketAddress);
                    }
                    channelTransportEntry2.setLastConnect(now);
                    this.transports.put(socketAddress, channelTransportEntry2);
                    return createTransport;
                } catch (Throwable th) {
                    channelTransportEntry2.setLastConnect(now);
                    this.transports.put(socketAddress, channelTransportEntry2);
                    throw th;
                }
            } catch (TransportException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("create transport exception, address: {}", socketAddress, e);
                }
                throw e;
            }
        }
    }

    protected ChannelTransport createTransport(SocketAddress socketAddress) throws TransportException {
        return new FailoverChannelTransport((ChannelTransport) this.transportClient.createTransport(socketAddress, this.connectionTimeout), socketAddress, this.connectionTimeout, this.transportClient, this.config, this.transportEventBus);
    }
}
