package com.github.ltsopensource.nio.processor;

import com.github.ltsopensource.core.commons.utils.CollectionUtils;
import com.github.ltsopensource.core.constant.Constants;
import com.github.ltsopensource.core.factory.NamedThreadFactory;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.core.support.SystemClock;
import com.github.ltsopensource.nio.NioException;
import com.github.ltsopensource.nio.channel.ChannelInitializer;
import com.github.ltsopensource.nio.channel.NioChannel;
import com.github.ltsopensource.nio.handler.Futures;
import com.github.ltsopensource.nio.handler.NioHandler;
import com.github.ltsopensource.nio.idle.IdleDetector;
import com.github.ltsopensource.nio.loop.NioSelectorLoop;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:com/github/ltsopensource/nio/processor/AbstractNioProcessor.class */
public abstract class AbstractNioProcessor implements NioProcessor {
    protected static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) NioProcessor.class);
    private NioHandler eventHandler;
    protected ChannelInitializer channelInitializer;
    private ConcurrentMap<NioChannel, WriteQueue> QUEUE_MAP = new ConcurrentHashMap();
    private AtomicBoolean started = new AtomicBoolean(false);
    private Executor executor = Executors.newFixedThreadPool(Constants.AVAILABLE_PROCESSOR, new NamedThreadFactory("NioProcessorExecutor", true));
    protected NioSelectorLoop selectorLoop = new NioSelectorLoop("AcceptSelectorLoop-I/O", this);
    protected IdleDetector idleDetector = new IdleDetector();

    public AbstractNioProcessor(NioHandler nioHandler, ChannelInitializer channelInitializer) {
        this.eventHandler = nioHandler;
        this.channelInitializer = channelInitializer;
        this.idleDetector.start();
    }

    @Override // com.github.ltsopensource.nio.processor.NioProcessor
    public Futures.WriteFuture writeAndFlush(NioChannel nioChannel, Object obj) {
        SelectionKey keyFor = nioChannel.socketChannel().keyFor(this.selectorLoop.selector());
        if (keyFor != null && keyFor.isValid()) {
            keyFor.interestOps(4);
        }
        return write(nioChannel, obj, true);
    }

    private Futures.WriteFuture write(NioChannel nioChannel, Object obj, boolean z) {
        Futures.WriteFuture newWriteFuture = Futures.newWriteFuture();
        if (obj == null) {
            newWriteFuture.setSuccess(true);
            newWriteFuture.setMsg("msg is null");
            newWriteFuture.notifyListeners();
            return newWriteFuture;
        }
        try {
            ByteBuffer encode = nioChannel.getEncoder().encode(nioChannel, obj);
            if (encode == null) {
                newWriteFuture.setSuccess(false);
                newWriteFuture.setMsg("encode msg error");
                newWriteFuture.notifyListeners();
                return newWriteFuture;
            }
            this.QUEUE_MAP.get(nioChannel).offer(new WriteRequest(encode, newWriteFuture));
            if (z) {
                doFlush(nioChannel);
            }
            return newWriteFuture;
        } catch (Exception e) {
            throw new NioException("encode msg " + obj + " error", e);
        }
    }

    @Override // com.github.ltsopensource.nio.processor.NioProcessor
    public void flush(NioChannel nioChannel) {
        doFlush(nioChannel);
    }

    private void doFlush(final NioChannel nioChannel) {
        executor().execute(new Runnable() { // from class: com.github.ltsopensource.nio.processor.AbstractNioProcessor.1
            @Override // java.lang.Runnable
            public void run() {
                WriteQueue writeQueue = (WriteQueue) AbstractNioProcessor.this.QUEUE_MAP.get(nioChannel);
                if (writeQueue.tryLock()) {
                    while (!writeQueue.isEmpty()) {
                        try {
                            WriteRequest peek = writeQueue.peek();
                            if (peek != null) {
                                Futures.WriteFuture writeFuture = peek.getWriteFuture();
                                try {
                                    ByteBuffer message = peek.getMessage();
                                    int write = nioChannel.socketChannel().write(message);
                                    if (AbstractNioProcessor.LOGGER.isDebugEnabled()) {
                                        AbstractNioProcessor.LOGGER.debug("wrote bytes {}", Integer.valueOf(write));
                                    }
                                    nioChannel.setLastWriteTime(SystemClock.now());
                                    if (message.remaining() != 0) {
                                        break;
                                    }
                                    writeQueue.poll();
                                    writeFuture.setSuccess(true);
                                    writeFuture.notifyListeners();
                                } catch (Exception e) {
                                    AbstractNioProcessor.LOGGER.error("IOE while writing", e);
                                    writeFuture.setSuccess(false);
                                    writeFuture.setCause(e);
                                    writeFuture.notifyListeners();
                                    AbstractNioProcessor.this.eventHandler().exceptionCaught(nioChannel, e);
                                }
                            }
                        } finally {
                            writeQueue.unlock();
                        }
                    }
                    SelectionKey keyFor = nioChannel.socketChannel().keyFor(AbstractNioProcessor.this.selectorLoop.selector());
                    if (keyFor != null && keyFor.isValid()) {
                        keyFor.interestOps(1);
                    }
                }
            }
        });
    }

    @Override // com.github.ltsopensource.nio.processor.NioProcessor
    public void read(NioChannel nioChannel) {
        try {
            ByteBuffer allocate = ByteBuffer.allocate(65536);
            int read = nioChannel.socketChannel().read(allocate);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("read {} bytes", Integer.valueOf(read));
            }
            if (read < 0) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("channel closed by the remote peer");
                }
                nioChannel.close();
            } else if (read > 0) {
                allocate.flip();
                doMessageReceived(nioChannel, allocate);
                allocate.clear();
            }
        } catch (IOException e) {
            LOGGER.error("IOE while reading : ", e);
            eventHandler().exceptionCaught(nioChannel, e);
        }
    }

    private void doMessageReceived(final NioChannel nioChannel, final ByteBuffer byteBuffer) {
        executor().execute(new Runnable() { // from class: com.github.ltsopensource.nio.processor.AbstractNioProcessor.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    List<Object> decode = nioChannel.getDecoder().decode(nioChannel, byteBuffer);
                    if (CollectionUtils.isNotEmpty(decode)) {
                        Iterator<Object> it = decode.iterator();
                        while (it.hasNext()) {
                            AbstractNioProcessor.this.eventHandler().messageReceived(nioChannel, it.next());
                        }
                    }
                    nioChannel.setLastReadTime(SystemClock.now());
                } catch (Exception e) {
                    AbstractNioProcessor.this.eventHandler().exceptionCaught(nioChannel, e);
                }
            }
        });
    }

    @Override // com.github.ltsopensource.nio.processor.NioProcessor
    public Futures.ConnectFuture connect(SocketAddress socketAddress) {
        Futures.ConnectFuture newConnectFuture = Futures.newConnectFuture();
        this.QUEUE_MAP.putIfAbsent(doConnect(socketAddress, this.selectorLoop, newConnectFuture), new WriteQueue());
        return newConnectFuture;
    }

    @Override // com.github.ltsopensource.nio.processor.NioProcessor
    public void accept(SelectionKey selectionKey) {
        this.QUEUE_MAP.putIfAbsent(doAccept(this.selectorLoop), new WriteQueue());
    }

    public void start() {
        if (this.started.compareAndSet(false, true)) {
            this.selectorLoop.start();
        }
    }

    protected abstract NioChannel doAccept(NioSelectorLoop nioSelectorLoop);

    protected abstract NioChannel doConnect(SocketAddress socketAddress, NioSelectorLoop nioSelectorLoop, Futures.ConnectFuture connectFuture);

    /* JADX INFO: Access modifiers changed from: protected */
    public NioHandler eventHandler() {
        return this.eventHandler;
    }

    protected Executor executor() {
        return this.executor;
    }
}
