package influent.forward;

import influent.internal.msgpack.MsgpackStreamUnpacker;
import influent.internal.nio.NioAttachment;
import influent.internal.nio.NioEventLoop;
import influent.internal.nio.NioTcpChannel;
import influent.internal.util.ThreadSafeQueue;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import org.msgpack.core.MessageBufferPacker;
import org.msgpack.core.MessagePack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:influent/forward/NioForwardConnection.class */
final class NioForwardConnection implements NioAttachment {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) NioForwardConnection.class);
    private static final String ACK_KEY = "ack";
    private final NioTcpChannel channel;
    private final NioEventLoop eventLoop;
    private final ForwardCallback callback;
    private final MsgpackStreamUnpacker unpacker;
    private final MsgpackForwardRequestDecoder decoder;
    final ThreadSafeQueue<ByteBuffer> responses;

    NioForwardConnection(NioTcpChannel nioTcpChannel, NioEventLoop nioEventLoop, ForwardCallback forwardCallback, MsgpackStreamUnpacker msgpackStreamUnpacker, MsgpackForwardRequestDecoder msgpackForwardRequestDecoder) {
        this.responses = new ThreadSafeQueue<>();
        this.channel = nioTcpChannel;
        this.eventLoop = nioEventLoop;
        this.callback = forwardCallback;
        this.unpacker = msgpackStreamUnpacker;
        this.decoder = msgpackForwardRequestDecoder;
    }

    NioForwardConnection(NioTcpChannel nioTcpChannel, NioEventLoop nioEventLoop, ForwardCallback forwardCallback, long j) {
        this(nioTcpChannel, nioEventLoop, forwardCallback, new MsgpackStreamUnpacker(j), new MsgpackForwardRequestDecoder());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NioForwardConnection(SocketChannel socketChannel, NioEventLoop nioEventLoop, ForwardCallback forwardCallback, long j, int i, boolean z, boolean z2) {
        this(new NioTcpChannel(socketChannel, i, z, z2), nioEventLoop, forwardCallback, j);
        this.channel.register(nioEventLoop, 1, this);
    }

    @Override // influent.internal.nio.NioAttachment
    public void onWritable(SelectionKey selectionKey) {
        if (sendResponses()) {
            this.eventLoop.disableInterestSet(selectionKey, 4);
        }
    }

    private boolean sendResponses() {
        while (this.responses.nonEmpty()) {
            ByteBuffer peek = this.responses.peek();
            this.channel.write(peek);
            if (peek.hasRemaining()) {
                return false;
            }
            this.responses.dequeue();
        }
        return true;
    }

    @Override // influent.internal.nio.NioAttachment
    public void onReadable(SelectionKey selectionKey) {
        receiveRequests(selectionKey);
        if (this.channel.isOpen()) {
            return;
        }
        close();
    }

    private void receiveRequests(SelectionKey selectionKey) {
        this.unpacker.feed(() -> {
            ByteBuffer allocate = ByteBuffer.allocate(1024);
            if (this.channel.read(allocate) <= 0) {
                return null;
            }
            allocate.flip();
            return allocate;
        }, this.channel);
        while (this.unpacker.hasNext()) {
            try {
                this.decoder.decode(this.unpacker.next()).ifPresent(forwardRequest -> {
                    logger.debug("Received a forward request from {}. chunk_id = {}", this.channel.getRemoteAddress(), forwardRequest.getOption());
                    this.callback.consume(forwardRequest.getStream()).thenRun(() -> {
                        forwardRequest.getOption().getChunk().ifPresent(str -> {
                            completeTask(selectionKey, str);
                        });
                        logger.debug("Completed the task. chunk_id = {}.", forwardRequest.getOption());
                    });
                });
            } catch (IllegalArgumentException e) {
                logger.error("Received an invalid message. remote address = " + this.channel.getRemoteAddress(), (Throwable) e);
            }
        }
    }

    private void completeTask(SelectionKey selectionKey, String str) {
        try {
            MessageBufferPacker newDefaultBufferPacker = MessagePack.newDefaultBufferPacker();
            newDefaultBufferPacker.packMapHeader(1);
            newDefaultBufferPacker.packString(ACK_KEY);
            newDefaultBufferPacker.packString(str);
            this.responses.enqueue(newDefaultBufferPacker.toMessageBuffer().sliceAsByteBuffer());
            this.eventLoop.enableInterestSet(selectionKey, 4);
        } catch (IOException e) {
            logger.error("Failed packing. chunk = " + str, (Throwable) e);
        }
    }

    @Override // influent.internal.nio.NioAttachment, java.lang.AutoCloseable
    public void close() {
        this.channel.close();
        logger.debug("NioForwardConnection bound with {} closed.", this.channel.getRemoteAddress());
    }

    public String toString() {
        return "NioForwardConnection(" + this.channel.getRemoteAddress() + ")";
    }
}
