package io.vlingo.wire.channel;

import io.vlingo.actors.Actor;
import io.vlingo.actors.Stoppable;
import io.vlingo.common.Cancellable;
import io.vlingo.common.Scheduled;
import io.vlingo.wire.message.BasicConsumerByteBuffer;
import io.vlingo.wire.message.ConsumerByteBuffer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;

/* loaded from: input_file:io/vlingo/wire/channel/SocketChannelSelectionProcessorActor.class */
public class SocketChannelSelectionProcessorActor extends Actor implements SocketChannelSelectionProcessor, ResponseSenderChannel, Scheduled<Object>, Stoppable {
    private int bufferId;
    private final Cancellable cancellable;
    private int contextId;
    private final int messageBufferSize;
    private final String name;
    private final RequestChannelConsumerProvider provider;
    private final Selector selector = open();
    private final ResponseSenderChannel responder = (ResponseSenderChannel) selfAs(ResponseSenderChannel.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vlingo/wire/channel/SocketChannelSelectionProcessorActor$Context.class */
    public class Context implements RequestResponseContext<SocketChannel> {
        private final ConsumerByteBuffer buffer;
        private final SocketChannel clientChannel;
        private Object closingData;
        private final RequestChannelConsumer consumer;
        private Object consumerData;
        private final String id;
        private final Queue<ConsumerByteBuffer> writables = new LinkedList();

        @Override // io.vlingo.wire.channel.RequestResponseContext
        public <T> T consumerData() {
            return (T) this.consumerData;
        }

        @Override // io.vlingo.wire.channel.RequestResponseContext
        public <T> T consumerData(T t) {
            this.consumerData = t;
            return t;
        }

        @Override // io.vlingo.wire.channel.RequestResponseContext
        public boolean hasConsumerData() {
            return this.consumerData != null;
        }

        @Override // io.vlingo.wire.channel.RequestResponseContext
        public String id() {
            return this.id;
        }

        @Override // io.vlingo.wire.channel.RequestResponseContext
        public ResponseSenderChannel sender() {
            return SocketChannelSelectionProcessorActor.this.responder;
        }

        @Override // io.vlingo.wire.channel.RequestResponseContext
        public void whenClosing(Object obj) {
            this.closingData = obj;
        }

        Context(SocketChannel socketChannel) {
            this.clientChannel = socketChannel;
            this.consumer = SocketChannelSelectionProcessorActor.this.provider.requestChannelConsumer();
            this.buffer = BasicConsumerByteBuffer.allocate(SocketChannelSelectionProcessorActor.access$204(SocketChannelSelectionProcessorActor.this), SocketChannelSelectionProcessorActor.this.messageBufferSize);
            this.id = "" + SocketChannelSelectionProcessorActor.access$404(SocketChannelSelectionProcessorActor.this);
        }

        void close() {
            if (this.clientChannel.isOpen()) {
                try {
                    consumer().closeWith(this, this.closingData);
                    this.clientChannel.close();
                } catch (Exception e) {
                    SocketChannelSelectionProcessorActor.this.logger().error("Failed to close client channel for " + SocketChannelSelectionProcessorActor.this.name + " because: " + e.getMessage(), e);
                }
            }
        }

        RequestChannelConsumer consumer() {
            return this.consumer;
        }

        boolean hasNextWritable() {
            return this.writables.peek() != null;
        }

        ConsumerByteBuffer nextWritable() {
            return this.writables.poll();
        }

        void queueWritable(ConsumerByteBuffer consumerByteBuffer) {
            this.writables.add(consumerByteBuffer);
        }

        ConsumerByteBuffer requestBuffer() {
            return this.buffer;
        }
    }

    public SocketChannelSelectionProcessorActor(RequestChannelConsumerProvider requestChannelConsumerProvider, String str, int i, int i2, long j) {
        this.provider = requestChannelConsumerProvider;
        this.name = str;
        this.messageBufferSize = i2;
        this.cancellable = stage().scheduler().schedule((Scheduled) selfAs(Scheduled.class), (Object) null, 100L, j);
    }

    @Override // io.vlingo.wire.channel.ResponseSenderChannel
    public void abandon(RequestResponseContext<?> requestResponseContext) {
        ((Context) requestResponseContext).close();
    }

    @Override // io.vlingo.wire.channel.SocketChannelSelectionProcessor
    public void close() {
        if (isStopped()) {
            return;
        }
        ((Stoppable) selfAs(Stoppable.class)).stop();
    }

    @Override // io.vlingo.wire.channel.ResponseSenderChannel
    public void respondWith(RequestResponseContext<?> requestResponseContext, ConsumerByteBuffer consumerByteBuffer) {
        ((Context) requestResponseContext).queueWritable(consumerByteBuffer);
    }

    @Override // io.vlingo.wire.channel.SocketChannelSelectionProcessor
    public void process(SelectionKey selectionKey) {
        SocketChannel accept;
        try {
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
            if (serverSocketChannel.isOpen() && (accept = serverSocketChannel.accept()) != null) {
                accept.configureBlocking(false);
                accept.register(this.selector, 5, new Context(accept));
            }
        } catch (Exception e) {
            String str = "Failed to accept client socket for " + this.name + " because: " + e.getMessage();
            logger().error(str, e);
            throw new IllegalArgumentException(str);
        }
    }

    public void intervalSignal(Scheduled<Object> scheduled, Object obj) {
        probeChannel();
    }

    public void stop() {
        this.cancellable.cancel();
        try {
            this.selector.close();
        } catch (Exception e) {
            logger().error("Failed to close selctor for " + this.name + " while stopping because: " + e.getMessage(), e);
        }
    }

    private void close(SocketChannel socketChannel, SelectionKey selectionKey) {
        try {
            socketChannel.close();
        } catch (Exception e) {
        }
        try {
            selectionKey.cancel();
        } catch (Exception e2) {
        }
    }

    private Selector open() {
        try {
            return Selector.open();
        } catch (Exception e) {
            String str = "Failed to open selector for " + this.name + " because: " + e.getMessage();
            logger().error(str, e);
            throw new IllegalArgumentException(str);
        }
    }

    private void probeChannel() {
        if (isStopped()) {
            return;
        }
        try {
            if (this.selector.selectNow() > 0) {
                Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
                while (it.hasNext()) {
                    SelectionKey next = it.next();
                    it.remove();
                    if (next.isValid()) {
                        if (next.isReadable()) {
                            read(next);
                        } else if (next.isWritable()) {
                            write(next);
                        }
                    }
                }
            }
        } catch (Exception e) {
            logger().error("Failed client channel processing for " + this.name + " because: " + e.getMessage(), e);
        }
    }

    private void read(SelectionKey selectionKey) throws IOException {
        int i;
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        if (!socketChannel.isOpen()) {
            selectionKey.cancel();
            return;
        }
        Context context = (Context) selectionKey.attachment();
        ConsumerByteBuffer clear = context.requestBuffer().clear();
        ByteBuffer asByteBuffer = clear.asByteBuffer();
        int i2 = 0;
        do {
            try {
                i = socketChannel.read(asByteBuffer);
                i2 += i;
            } catch (Exception e) {
                i = -1;
            }
        } while (i > 0);
        if (i == -1) {
            close(socketChannel, selectionKey);
        }
        if (i2 > 0) {
            context.consumer().consume(context, clear.flip());
        } else {
            clear.release();
        }
    }

    private void write(SelectionKey selectionKey) throws Exception {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        if (!socketChannel.isOpen()) {
            selectionKey.cancel();
            return;
        }
        Context context = (Context) selectionKey.attachment();
        if (context.hasNextWritable()) {
            writeWithCachedData(context, socketChannel);
        }
    }

    private void writeWithCachedData(Context context, SocketChannel socketChannel) throws Exception {
        ConsumerByteBuffer nextWritable = context.nextWritable();
        while (true) {
            ConsumerByteBuffer consumerByteBuffer = nextWritable;
            if (consumerByteBuffer == null) {
                return;
            }
            writeWithCachedData(context, socketChannel, consumerByteBuffer);
            nextWritable = context.nextWritable();
        }
    }

    private void writeWithCachedData(Context context, SocketChannel socketChannel, ConsumerByteBuffer consumerByteBuffer) throws Exception {
        try {
            try {
                ByteBuffer asByteBuffer = consumerByteBuffer.asByteBuffer();
                while (asByteBuffer.hasRemaining()) {
                    socketChannel.write(asByteBuffer);
                }
                consumerByteBuffer.release();
            } catch (Exception e) {
                logger().error("Failed to write buffer for " + this.name + " with channel " + socketChannel.getRemoteAddress() + " because: " + e.getMessage(), e);
                consumerByteBuffer.release();
            }
        } catch (Throwable th) {
            consumerByteBuffer.release();
            throw th;
        }
    }

    static /* synthetic */ int access$204(SocketChannelSelectionProcessorActor socketChannelSelectionProcessorActor) {
        int i = socketChannelSelectionProcessorActor.bufferId + 1;
        socketChannelSelectionProcessorActor.bufferId = i;
        return i;
    }

    static /* synthetic */ int access$404(SocketChannelSelectionProcessorActor socketChannelSelectionProcessorActor) {
        int i = socketChannelSelectionProcessorActor.contextId + 1;
        socketChannelSelectionProcessorActor.contextId = i;
        return i;
    }
}
