package org.apache.dolphinscheduler.remote;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.exceptions.RemotingTimeoutException;
import org.apache.dolphinscheduler.remote.exceptions.RemotingTooMuchRequestException;
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ReleaseSemaphore;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/dolphinscheduler/remote/NettyRemotingClient.class */
public class NettyRemotingClient {
    private final EventLoopGroup workerGroup;
    private final NettyClientConfig clientConfig;
    private final ExecutorService callbackExecutor;
    private final NettyClientHandler clientHandler;
    private final ScheduledExecutorService responseFutureExecutor;
    private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
    private final Bootstrap bootstrap = new Bootstrap();
    private final NettyEncoder encoder = new NettyEncoder();
    private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap<>(128);
    private final AtomicBoolean isStarted = new AtomicBoolean(false);
    private final Semaphore asyncSemaphore = new Semaphore(200, true);

    public NettyRemotingClient(NettyClientConfig nettyClientConfig) {
        this.clientConfig = nettyClientConfig;
        if (NettyUtils.useEpoll()) {
            this.workerGroup = new EpollEventLoopGroup(nettyClientConfig.getWorkerThreads(), new ThreadFactory() { // from class: org.apache.dolphinscheduler.remote.NettyRemotingClient.1
                private final AtomicInteger threadIndex = new AtomicInteger(0);

                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    return new Thread(runnable, String.format("NettyClient_%d", Integer.valueOf(this.threadIndex.incrementAndGet())));
                }
            });
        } else {
            this.workerGroup = new NioEventLoopGroup(nettyClientConfig.getWorkerThreads(), new ThreadFactory() { // from class: org.apache.dolphinscheduler.remote.NettyRemotingClient.2
                private final AtomicInteger threadIndex = new AtomicInteger(0);

                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    return new Thread(runnable, String.format("NettyClient_%d", Integer.valueOf(this.threadIndex.incrementAndGet())));
                }
            });
        }
        this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue(1000), new NamedThreadFactory("CallbackExecutor", 10), new CallerThreadExecutePolicy());
        this.clientHandler = new NettyClientHandler(this, this.callbackExecutor);
        this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));
        start();
    }

    private void start() {
        this.bootstrap.group(this.workerGroup).channel(NettyUtils.getSocketChannelClass()).option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(this.clientConfig.isSoKeepalive())).option(ChannelOption.TCP_NODELAY, Boolean.valueOf(this.clientConfig.isTcpNoDelay())).option(ChannelOption.SO_SNDBUF, Integer.valueOf(this.clientConfig.getSendBufferSize())).option(ChannelOption.SO_RCVBUF, Integer.valueOf(this.clientConfig.getReceiveBufferSize())).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(this.clientConfig.getConnectTimeoutMillis())).handler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.dolphinscheduler.remote.NettyRemotingClient.3
            public void initChannel(SocketChannel socketChannel) {
                socketChannel.pipeline().addLast("client-idle-handler", new IdleStateHandler(6000L, 0L, 0L, TimeUnit.MILLISECONDS)).addLast(new ChannelHandler[]{new NettyDecoder(), NettyRemotingClient.this.clientHandler, NettyRemotingClient.this.encoder});
            }
        });
        this.responseFutureExecutor.scheduleAtFixedRate(ResponseFuture::scanFutureTable, 5000L, 1000L, TimeUnit.MILLISECONDS);
        this.isStarted.compareAndSet(false, true);
    }

    public void sendAsync(Host host, Command command, long j, InvokeCallback invokeCallback) throws InterruptedException, RemotingException {
        Channel channel = getChannel(host);
        if (channel == null) {
            throw new RemotingException("network error");
        }
        long opaque = command.getOpaque();
        if (!this.asyncSemaphore.tryAcquire(j, TimeUnit.MILLISECONDS)) {
            throw new RemotingTooMuchRequestException(String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d", Long.valueOf(j), Integer.valueOf(this.asyncSemaphore.getQueueLength()), Integer.valueOf(this.asyncSemaphore.availablePermits())));
        }
        ResponseFuture responseFuture = new ResponseFuture(opaque, j, invokeCallback, new ReleaseSemaphore(this.asyncSemaphore));
        try {
            channel.writeAndFlush(command).addListener(future -> {
                if (future.isSuccess()) {
                    responseFuture.setSendOk(true);
                    return;
                }
                responseFuture.setSendOk(false);
                responseFuture.setCause(future.cause());
                responseFuture.putResponse(null);
                try {
                    try {
                        responseFuture.executeInvokeCallback();
                        responseFuture.release();
                    } catch (Exception e) {
                        this.logger.error("execute callback error", e);
                        responseFuture.release();
                    }
                } catch (Throwable th) {
                    responseFuture.release();
                    throw th;
                }
            });
        } catch (Exception e) {
            responseFuture.release();
            throw new RemotingException(String.format("send command to host: %s failed", host), e);
        }
    }

    public Command sendSync(Host host, Command command, long j) throws InterruptedException, RemotingException {
        Channel channel = getChannel(host);
        if (channel == null) {
            throw new RemotingException(String.format("connect to : %s fail", host));
        }
        ResponseFuture responseFuture = new ResponseFuture(command.getOpaque(), j, null, null);
        channel.writeAndFlush(command).addListener(future -> {
            if (future.isSuccess()) {
                responseFuture.setSendOk(true);
                return;
            }
            responseFuture.setSendOk(false);
            responseFuture.setCause(future.cause());
            responseFuture.putResponse(null);
            this.logger.error("send command {} to host {} failed", command, host);
        });
        Command waitResponse = responseFuture.waitResponse();
        if (waitResponse != null) {
            return waitResponse;
        }
        if (responseFuture.isSendOK()) {
            throw new RemotingTimeoutException(host.toString(), j, responseFuture.getCause());
        }
        throw new RemotingException(host.toString(), responseFuture.getCause());
    }

    public void send(Host host, Command command) throws RemotingException {
        Channel channel = getChannel(host);
        if (channel == null) {
            throw new RemotingException(String.format("connect to : %s fail", host));
        }
        try {
            ChannelFuture await = channel.writeAndFlush(command).await();
            if (await.isSuccess()) {
                this.logger.debug("send command : {} , to : {} successfully.", command, host.getAddress());
            } else {
                String format = String.format("send command : %s , to :%s failed", command, host.getAddress());
                this.logger.error(format, await.cause());
                throw new RemotingException(format);
            }
        } catch (Exception e) {
            this.logger.error("Send command {} to address {} encounter error.", command, host.getAddress());
            throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e);
        }
    }

    public void registerProcessor(CommandType commandType, NettyRequestProcessor nettyRequestProcessor) {
        registerProcessor(commandType, nettyRequestProcessor, null);
    }

    public void registerProcessor(CommandType commandType, NettyRequestProcessor nettyRequestProcessor, ExecutorService executorService) {
        this.clientHandler.registerProcessor(commandType, nettyRequestProcessor, executorService);
    }

    public Channel getChannel(Host host) {
        Channel channel = this.channels.get(host);
        return (channel == null || !channel.isActive()) ? createChannel(host, true) : channel;
    }

    public Channel createChannel(Host host, boolean z) {
        ChannelFuture connect;
        try {
            synchronized (this.bootstrap) {
                connect = this.bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort()));
            }
            if (z) {
                connect.sync();
            }
            if (!connect.isSuccess()) {
                return null;
            }
            Channel channel = connect.channel();
            this.channels.put(host, channel);
            return channel;
        } catch (Exception e) {
            this.logger.warn(String.format("connect to %s error", host), e);
            return null;
        }
    }

    public void close() {
        if (this.isStarted.compareAndSet(true, false)) {
            try {
                closeChannels();
                if (this.workerGroup != null) {
                    this.workerGroup.shutdownGracefully();
                }
                if (this.callbackExecutor != null) {
                    this.callbackExecutor.shutdownNow();
                }
                if (this.responseFutureExecutor != null) {
                    this.responseFutureExecutor.shutdownNow();
                }
            } catch (Exception e) {
                this.logger.error("netty client close exception", e);
            }
            this.logger.info("netty client closed");
        }
    }

    private void closeChannels() {
        Iterator<Channel> it = this.channels.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.channels.clear();
    }

    public void closeChannel(Host host) {
        Channel remove = this.channels.remove(host);
        if (remove != null) {
            remove.close();
        }
    }
}
