package io.vlingo.wire.channel;

import io.vlingo.actors.Actor;
import io.vlingo.actors.Stoppable;
import io.vlingo.common.Cancellable;
import io.vlingo.common.Scheduled;
import io.vlingo.common.pool.ResourcePool;
import io.vlingo.wire.message.ConsumerByteBuffer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;

/* loaded from: input_file:io/vlingo/wire/channel/SocketChannelSelectionProcessorActor.class */
public class SocketChannelSelectionProcessorActor extends Actor implements SocketChannelSelectionProcessor, ResponseSenderChannel, Scheduled<Object>, Stoppable {
    private final Cancellable cancellable;
    private int contextId;
    private final String name;
    private final long probeTimeout;
    private final RequestChannelConsumerProvider provider;
    private final ResourcePool<ConsumerByteBuffer, String> requestBufferPool;
    private final ResponseSenderChannel responder;
    private final RefreshableSelector selector;
    private final LinkedList<Context> writableContexts;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vlingo/wire/channel/SocketChannelSelectionProcessorActor$Context.class */
    public class Context implements RequestResponseContext<SocketChannel> {
        private final SocketChannel clientChannel;
        private Object closingData;
        private final RequestChannelConsumer consumer;
        private Object consumerData;
        private final String id;
        private final Queue<ConsumerByteBuffer> writables = new LinkedList();
        private boolean writeMode = false;

        @Override // io.vlingo.wire.channel.RequestResponseContext
        public <T> T consumerData() {
            return (T) this.consumerData;
        }

        @Override // io.vlingo.wire.channel.RequestResponseContext
        public <T> T consumerData(T t) {
            this.consumerData = t;
            return t;
        }

        @Override // io.vlingo.wire.channel.RequestResponseContext
        public boolean hasConsumerData() {
            return this.consumerData != null;
        }

        @Override // io.vlingo.wire.channel.RequestResponseContext
        public String id() {
            return this.id;
        }

        @Override // io.vlingo.wire.channel.RequestResponseContext
        public ResponseSenderChannel sender() {
            return SocketChannelSelectionProcessorActor.this.responder;
        }

        @Override // io.vlingo.wire.channel.RequestResponseContext
        public void whenClosing(Object obj) {
            this.closingData = obj;
        }

        Context(SocketChannel socketChannel) {
            this.clientChannel = socketChannel;
            this.consumer = SocketChannelSelectionProcessorActor.this.provider.requestChannelConsumer();
            this.id = "" + SocketChannelSelectionProcessorActor.access$404(SocketChannelSelectionProcessorActor.this);
        }

        boolean isChannelClosed() {
            return !this.clientChannel.isOpen();
        }

        void close() {
            try {
                consumer().closeWith(this, this.closingData);
                whenClosing(null);
                SocketChannelSelectionProcessorActor.this.selector.keyFor(this.clientChannel).cancel();
                this.clientChannel.close();
            } catch (Exception e) {
                SocketChannelSelectionProcessorActor.this.logger().info("Client channel didn't close normally, but is probably already closed.");
            }
        }

        RequestChannelConsumer consumer() {
            return this.consumer;
        }

        void confirmCurrentWritable(ConsumerByteBuffer consumerByteBuffer) {
            try {
                consumerByteBuffer.release();
            } catch (Exception e) {
            }
            try {
                setWriteMode(false);
            } catch (Exception e2) {
            }
            this.writables.poll();
        }

        boolean hasNextWritable() {
            return this.writables.peek() != null;
        }

        ConsumerByteBuffer nextWritable() {
            return this.writables.peek();
        }

        void queueWritable(ConsumerByteBuffer consumerByteBuffer) {
            this.writables.add(consumerByteBuffer);
            if (this.writeMode) {
                return;
            }
            SocketChannelSelectionProcessorActor.this.writableContexts.add(this);
        }

        ConsumerByteBuffer requestBuffer() {
            return (ConsumerByteBuffer) SocketChannelSelectionProcessorActor.this.requestBufferPool.acquire("SocketChannelSelectionProcessorActor#Context");
        }

        void setWriteMode(boolean z) throws ClosedChannelException {
            SocketChannelSelectionProcessorActor.this.selector.registerWith(this.clientChannel, 1 | (z ? 4 : 0), this);
            this.writeMode = z;
        }
    }

    public SocketChannelSelectionProcessorActor(RequestChannelConsumerProvider requestChannelConsumerProvider, String str, ResourcePool<ConsumerByteBuffer, String> resourcePool, long j, long j2) {
        logger().debug("Probe interval: " + j + " Probe timeout: " + j2);
        this.provider = requestChannelConsumerProvider;
        this.name = str;
        this.requestBufferPool = resourcePool;
        this.probeTimeout = j2;
        this.selector = RefreshableSelector.open(str);
        this.responder = (ResponseSenderChannel) selfAs(ResponseSenderChannel.class);
        this.writableContexts = new LinkedList<>();
        this.cancellable = stage().scheduler().schedule((Scheduled) selfAs(Scheduled.class), (Object) null, 100L, j);
    }

    @Override // io.vlingo.wire.channel.ResponseSenderChannel
    public void abandon(RequestResponseContext<?> requestResponseContext) {
        ((Context) requestResponseContext).close();
    }

    @Override // io.vlingo.wire.channel.SocketChannelSelectionProcessor
    public void close() {
        if (isStopped()) {
            return;
        }
        ((Stoppable) selfAs(Stoppable.class)).stop();
    }

    @Override // io.vlingo.wire.channel.ResponseSenderChannel
    public void respondWith(RequestResponseContext<?> requestResponseContext, ConsumerByteBuffer consumerByteBuffer) {
        ((Context) requestResponseContext).queueWritable(consumerByteBuffer);
    }

    @Override // io.vlingo.wire.channel.SocketChannelSelectionProcessor
    public void process(SocketChannel socketChannel) {
        try {
            this.selector.registerWith(socketChannel, 1, new Context(socketChannel));
        } catch (Exception e) {
            String str = "Failed to accept client socket for " + this.name + " because: " + e.getMessage();
            logger().error(str, e);
            throw new IllegalArgumentException(str);
        }
    }

    public void intervalSignal(Scheduled<Object> scheduled, Object obj) {
        probeChannel();
    }

    public void stop() {
        this.cancellable.cancel();
        try {
            this.selector.close();
        } catch (Exception e) {
            logger().error("Failed to close selector for " + this.name + " while stopping because: " + e.getMessage(), e);
        }
    }

    private void close(Context context, SelectionKey selectionKey) {
        try {
            context.close();
        } catch (Exception e) {
        }
        try {
            selectionKey.cancel();
        } catch (Exception e2) {
        }
    }

    private void probeChannel() {
        if (isStopped()) {
            return;
        }
        try {
            Iterator<SelectionKey> select = this.selector.select(this.probeTimeout);
            while (select.hasNext()) {
                SelectionKey next = select.next();
                select.remove();
                if (next.isValid()) {
                    if (next.isReadable()) {
                        read(next);
                    } else if (next.isWritable()) {
                        write(next);
                    }
                }
            }
            while (!this.writableContexts.isEmpty()) {
                write(this.writableContexts.poll());
            }
        } catch (ClosedSelectorException e) {
            logger().error("Failed client channel processing for " + this.name + " because selector is closed.");
        } catch (Exception e2) {
            logger().error("Failed client channel processing for " + this.name + " because: " + e2.getMessage(), e2);
        }
    }

    private void read(SelectionKey selectionKey) throws IOException {
        int i;
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        if (!socketChannel.isOpen()) {
            selectionKey.cancel();
            return;
        }
        Context context = (Context) selectionKey.attachment();
        ConsumerByteBuffer clear = context.requestBuffer().clear();
        ByteBuffer asByteBuffer = clear.asByteBuffer();
        int i2 = 0;
        do {
            try {
                i = socketChannel.read(asByteBuffer);
                i2 += i;
            } catch (Exception e) {
                i = -1;
            }
        } while (i > 0);
        if (i == -1) {
            close(context, selectionKey);
        }
        if (i2 > 0) {
            context.consumer().consume(context, clear.flip());
        } else {
            context.close();
        }
    }

    private void write(SelectionKey selectionKey) throws Exception {
        if (((SocketChannel) selectionKey.channel()).isOpen()) {
            write((Context) selectionKey.attachment());
        } else {
            selectionKey.cancel();
        }
    }

    private void write(Context context) throws Exception {
        if (context.isChannelClosed()) {
            context.close();
        } else {
            if (context.writeMode || !context.hasNextWritable()) {
                return;
            }
            writeWithCachedData(context, context.clientChannel);
        }
    }

    private void writeWithCachedData(Context context, SocketChannel socketChannel) throws Exception {
        ConsumerByteBuffer nextWritable = context.nextWritable();
        while (true) {
            ConsumerByteBuffer consumerByteBuffer = nextWritable;
            if (consumerByteBuffer == null) {
                return;
            }
            writeWithCachedData(context, socketChannel, consumerByteBuffer);
            nextWritable = context.nextWritable();
        }
    }

    private void writeWithCachedData(Context context, SocketChannel socketChannel, ConsumerByteBuffer consumerByteBuffer) throws Exception {
        try {
            ByteBuffer asByteBuffer = consumerByteBuffer.asByteBuffer();
            while (asByteBuffer.hasRemaining()) {
                if (socketChannel.write(asByteBuffer) < 1) {
                    context.setWriteMode(true);
                    return;
                }
            }
            context.confirmCurrentWritable(consumerByteBuffer);
        } catch (Exception e) {
            logger().error("Failed to write buffer for " + this.name + " with channel " + socketChannel.getRemoteAddress() + " because: " + e.getMessage(), e);
        }
    }

    static /* synthetic */ int access$404(SocketChannelSelectionProcessorActor socketChannelSelectionProcessorActor) {
        int i = socketChannelSelectionProcessorActor.contextId + 1;
        socketChannelSelectionProcessorActor.contextId = i;
        return i;
    }
}
