package io.bigio.core.member;

import io.bigio.Parameters;
import io.bigio.core.Envelope;
import io.bigio.core.GossipMessage;
import io.bigio.core.codec.EnvelopeEncoder;
import io.bigio.core.codec.GossipEncoder;
import io.bigio.util.NetworkUtil;
import io.bigio.util.RunningStatistics;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ChannelFactory;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/bigio/core/member/RemoteMemberUDP.class */
public class RemoteMemberUDP extends RemoteMember {
    private static final String MAX_RETRY_COUNT_PROPERTY = "io.bigio.remote.maxRetry";
    private static final String RETRY_INTERVAL_PROPERTY = "io.bigio.remote.retryInterval";
    private static final String CONNECTION_TIMEOUT_PROPERTY = "io.bigio.remote.connectionTimeout";
    private static final String DEFAULT_MAX_RETRY_COUNT = "3";
    private static final String DEFAULT_RETRY_INTERVAL = "2000";
    private static final String DEFAULT_CONNECTION_TIMEOUT = "5000";
    private static final int CLIENT_THREAD_POOL_SIZE = 2;
    private static final int GOSSIP_WORKER_THREADS = 2;
    private static final int DATA_WORKER_THREADS = 2;
    private static final Logger LOG = LoggerFactory.getLogger(RemoteMemberUDP.class);
    private int maxRetry;
    private long retryInterval;
    private int timeout;
    private final AtomicInteger gossipRetryCount;
    private final AtomicInteger dataRetryCount;
    private final ScheduledExecutorService retryExecutor;
    private final ExecutorService serverExecutor;
    private Channel gossipChannel;
    private DatagramChannel dataChannel;
    private EventLoopGroup gossipWorkerGroup;
    private EventLoopGroup dataWorkerGroup;
    private final RunningStatistics gossipSizeStat;
    private final RunningStatistics dataSizeStat;
    private InetSocketAddress address;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/bigio/core/member/RemoteMemberUDP$DataExceptionHandler.class */
    public class DataExceptionHandler extends ChannelInboundHandlerAdapter {
        private DataExceptionHandler() {
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            RemoteMemberUDP.this.retryGossipConnection();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/bigio/core/member/RemoteMemberUDP$GossipExceptionHandler.class */
    public class GossipExceptionHandler extends ChannelInboundHandlerAdapter {
        private GossipExceptionHandler() {
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
            RemoteMemberUDP.LOG.trace("Member left");
            RemoteMemberUDP.this.setStatus(MemberStatus.Left);
            RemoteMemberUDP.this.updateMember();
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            RemoteMemberUDP.this.retryGossipConnection();
        }
    }

    public RemoteMemberUDP(MemberHolder memberHolder) {
        super(memberHolder);
        this.gossipRetryCount = new AtomicInteger(0);
        this.dataRetryCount = new AtomicInteger(0);
        this.retryExecutor = Executors.newScheduledThreadPool(2);
        this.serverExecutor = Executors.newFixedThreadPool(2);
        this.gossipChannel = null;
        this.dataChannel = null;
        this.gossipWorkerGroup = null;
        this.dataWorkerGroup = null;
        this.gossipSizeStat = new RunningStatistics();
        this.dataSizeStat = new RunningStatistics();
    }

    public RemoteMemberUDP(String str, int i, int i2, MemberHolder memberHolder) {
        super(str, i, i2, memberHolder);
        this.gossipRetryCount = new AtomicInteger(0);
        this.dataRetryCount = new AtomicInteger(0);
        this.retryExecutor = Executors.newScheduledThreadPool(2);
        this.serverExecutor = Executors.newFixedThreadPool(2);
        this.gossipChannel = null;
        this.dataChannel = null;
        this.gossipWorkerGroup = null;
        this.dataWorkerGroup = null;
        this.gossipSizeStat = new RunningStatistics();
        this.dataSizeStat = new RunningStatistics();
    }

    @Override // io.bigio.core.member.AbstractMember
    public void initialize() {
        this.maxRetry = Integer.parseInt(Parameters.INSTANCE.getProperty(MAX_RETRY_COUNT_PROPERTY, DEFAULT_MAX_RETRY_COUNT));
        this.retryInterval = Long.parseLong(Parameters.INSTANCE.getProperty(RETRY_INTERVAL_PROPERTY, DEFAULT_RETRY_INTERVAL));
        this.timeout = Integer.parseInt(Parameters.INSTANCE.getProperty(CONNECTION_TIMEOUT_PROPERTY, DEFAULT_CONNECTION_TIMEOUT));
        try {
            if (NetworkUtil.getNetworkInterface() == null || !NetworkUtil.getNetworkInterface().isUp()) {
                LOG.error("Cannot start networking. Interface is down.");
                return;
            }
            this.address = new InetSocketAddress(getIp(), getDataPort());
            this.serverExecutor.submit(new Runnable() { // from class: io.bigio.core.member.RemoteMemberUDP.1
                @Override // java.lang.Runnable
                public void run() {
                    RemoteMemberUDP.this.initializeGossipClient();
                }
            });
            this.serverExecutor.submit(new Runnable() { // from class: io.bigio.core.member.RemoteMemberUDP.2
                @Override // java.lang.Runnable
                public void run() {
                    RemoteMemberUDP.this.initializeDataClient();
                }
            });
        } catch (SocketException e) {
            LOG.error("Cannot start networking.", e);
        }
    }

    @Override // io.bigio.core.member.Member
    public void send(Envelope envelope) throws IOException {
        byte[] encode = EnvelopeEncoder.encode(envelope);
        if (LOG.isTraceEnabled()) {
            this.dataSizeStat.push(encode.length);
        }
        if (this.dataChannel != null) {
            this.dataChannel.writeAndFlush(new DatagramPacket(Unpooled.wrappedBuffer(encode), this.address));
        }
    }

    @Override // io.bigio.core.member.RemoteMember
    public void gossip(GossipMessage gossipMessage) throws IOException {
        byte[] encode = GossipEncoder.encode(gossipMessage);
        if (LOG.isTraceEnabled()) {
            this.gossipSizeStat.push(encode.length);
        }
        if (this.gossipChannel != null) {
            this.gossipChannel.writeAndFlush(encode);
        }
    }

    @Override // io.bigio.core.member.AbstractMember
    public void shutdown() {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Closing connections to " + getIp() + ":" + getGossipPort() + ":" + getDataPort());
        }
        if (this.gossipWorkerGroup != null) {
            this.gossipWorkerGroup.shutdownGracefully();
        }
        if (this.dataWorkerGroup != null) {
            this.dataWorkerGroup.shutdownGracefully();
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Mean sent gossip message size: " + this.gossipSizeStat.mean() + " over " + this.gossipSizeStat.numSamples() + " samples");
            LOG.trace("Mean sent data message size: " + this.dataSizeStat.mean() + " over " + this.dataSizeStat.numSamples() + " samples");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateMember() {
        this.memberHolder.updateMemberStatus(this);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initializeGossipClient() {
        LOG.trace("Initializing gossip client");
        this.gossipWorkerGroup = new NioEventLoopGroup(2);
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(this.gossipWorkerGroup);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(this.timeout));
        bootstrap.handler(new ChannelInitializer<SocketChannel>() { // from class: io.bigio.core.member.RemoteMemberUDP.3
            public void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.config().setAllocator(UnpooledByteBufAllocator.DEFAULT);
                socketChannel.pipeline().addLast("encoder", new ByteArrayEncoder());
                socketChannel.pipeline().addLast("decoder", new ByteArrayDecoder());
                socketChannel.pipeline().addLast(new ChannelHandler[]{new GossipExceptionHandler()});
                if (RemoteMemberUDP.LOG.isTraceEnabled()) {
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new LoggingHandler(LogLevel.TRACE)});
                }
            }

            public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
                RemoteMemberUDP.LOG.error("Cannot initialize gossip client.", th);
                channelHandlerContext.close();
            }
        });
        ChannelFuture awaitUninterruptibly = bootstrap.connect(getIp(), getGossipPort()).awaitUninterruptibly();
        if (awaitUninterruptibly.isCancelled()) {
            this.gossipChannel = null;
            return;
        }
        if (!awaitUninterruptibly.isSuccess()) {
            this.gossipChannel = null;
            retryGossipConnection();
        } else {
            this.gossipChannel = awaitUninterruptibly.channel();
            setStatus(MemberStatus.Alive);
            updateMember();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initializeDataClient() {
        LOG.trace("Initializing data client");
        this.dataWorkerGroup = new NioEventLoopGroup(2);
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(this.dataWorkerGroup).channelFactory(new ChannelFactory<Channel>() { // from class: io.bigio.core.member.RemoteMemberUDP.5
            public Channel newChannel() {
                return new NioDatagramChannel(InternetProtocolFamily.IPv4);
            }

            public String toString() {
                return NioDatagramChannel.class.getSimpleName() + ".class";
            }
        }).handler(new ChannelInitializer<DatagramChannel>() { // from class: io.bigio.core.member.RemoteMemberUDP.4
            public void initChannel(DatagramChannel datagramChannel) throws Exception {
                datagramChannel.config().setAllocator(UnpooledByteBufAllocator.DEFAULT);
                datagramChannel.pipeline().addLast("encoder", new ByteArrayEncoder());
                datagramChannel.pipeline().addLast("decoder", new ByteArrayDecoder());
                datagramChannel.pipeline().addLast(new ChannelHandler[]{new DataExceptionHandler()});
                if (RemoteMemberUDP.LOG.isTraceEnabled()) {
                    datagramChannel.pipeline().addLast(new ChannelHandler[]{new LoggingHandler(LogLevel.TRACE)});
                }
            }

            public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
                RemoteMemberUDP.LOG.error("Cannot initialize data client.", th);
                channelHandlerContext.close();
            }
        });
        ChannelFuture awaitUninterruptibly = bootstrap.connect(getIp(), getDataPort()).awaitUninterruptibly();
        if (awaitUninterruptibly.isCancelled()) {
            this.dataChannel = null;
            return;
        }
        if (!awaitUninterruptibly.isSuccess()) {
            this.dataChannel = null;
            retryDataConnection();
            return;
        }
        this.dataChannel = awaitUninterruptibly.channel();
        try {
            this.dataChannel.closeFuture().sync();
        } catch (InterruptedException e) {
            LOG.warn("Interrupted waiting for client to shutdown.", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void retryGossipConnection() {
        if (this.gossipRetryCount.getAndIncrement() < this.maxRetry) {
            this.retryExecutor.schedule(new Runnable() { // from class: io.bigio.core.member.RemoteMemberUDP.6
                @Override // java.lang.Runnable
                public void run() {
                    RemoteMemberUDP.this.initializeGossipClient();
                }
            }, this.retryInterval, TimeUnit.MILLISECONDS);
        } else {
            LOG.warn("Could not connect to gossip server after max retries.");
        }
    }

    private void retryDataConnection() {
        if (this.dataRetryCount.getAndIncrement() < this.maxRetry) {
            this.retryExecutor.schedule(new Runnable() { // from class: io.bigio.core.member.RemoteMemberUDP.7
                @Override // java.lang.Runnable
                public void run() {
                    RemoteMemberUDP.this.initializeDataClient();
                }
            }, this.retryInterval, TimeUnit.MILLISECONDS);
        } else {
            LOG.warn("Could not connect to data server after max retries.");
        }
    }
}
