package io.vlingo.wire.fdx.bidirectional;

import io.vlingo.actors.Actor;
import io.vlingo.actors.Cancellable;
import io.vlingo.actors.Scheduled;
import io.vlingo.wire.channel.RequestChannelConsumer;
import io.vlingo.wire.channel.RequestResponseContext;
import io.vlingo.wire.channel.ResponseSenderChannel;
import io.vlingo.wire.message.ByteBufferPool;
import io.vlingo.wire.message.ConsumerByteBuffer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;

/* loaded from: input_file:io/vlingo/wire/fdx/bidirectional/ServerRequestResponseChannelActor.class */
public class ServerRequestResponseChannelActor extends Actor implements ServerRequestResponseChannel, Scheduled {
    private static long NextContextId = 1;
    private final Cancellable cancellable;
    private boolean closed;
    private RequestChannelConsumer consumer;
    private final int maxBufferPoolSize;
    private final int maxMessageSize;
    private final String name;
    private final long probeTimeout;
    private final ResponseSenderChannel self = (ResponseSenderChannel) selfAs(ResponseSenderChannel.class);
    private final ServerSocketChannel channel = ServerSocketChannel.open();
    private final Selector selector = Selector.open();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/vlingo/wire/fdx/bidirectional/ServerRequestResponseChannelActor$Context.class */
    public class Context implements RequestResponseContext<SocketChannel> {
        private final ByteBufferPool bufferPool;
        private final SocketChannel clientChannel;
        private final String id;
        private final ResponseSenderChannel responder;
        private Object consumerData = null;
        private final Queue<ConsumerByteBuffer> writables = new LinkedList();

        @Override // io.vlingo.wire.channel.RequestResponseContext
        public <T> T consumerData() {
            return (T) this.consumerData;
        }

        @Override // io.vlingo.wire.channel.RequestResponseContext
        public <T> T consumerData(T t) {
            this.consumerData = t;
            return t;
        }

        @Override // io.vlingo.wire.channel.RequestResponseContext
        public boolean hasConsumerData() {
            return this.consumerData != null;
        }

        @Override // io.vlingo.wire.channel.RequestResponseContext
        public String id() {
            return this.id;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.vlingo.wire.channel.RequestResponseContext
        public SocketChannel reference() {
            return this.clientChannel;
        }

        @Override // io.vlingo.wire.channel.RequestResponseContext
        public ResponseSenderChannel sender() {
            return this.responder;
        }

        Context(long j, ResponseSenderChannel responseSenderChannel, SocketChannel socketChannel) {
            this.responder = responseSenderChannel;
            this.clientChannel = socketChannel;
            this.bufferPool = new ByteBufferPool(ServerRequestResponseChannelActor.this.maxBufferPoolSize, ServerRequestResponseChannelActor.this.maxMessageSize);
            this.id = "" + j;
        }

        void close() {
            try {
                this.clientChannel.close();
            } catch (Exception e) {
                e.printStackTrace();
                ServerRequestResponseChannelActor.this.logger().log("Failed to close client channel because: " + e.getMessage(), e);
            }
        }

        boolean hasNextWritable() {
            return this.writables.peek() != null;
        }

        ConsumerByteBuffer nextWritable() {
            return this.writables.poll();
        }

        void queueWritable(ConsumerByteBuffer consumerByteBuffer) {
            this.writables.add(consumerByteBuffer);
        }

        ConsumerByteBuffer requestBuffer() {
            return this.bufferPool.accessFor("request", 25);
        }
    }

    public ServerRequestResponseChannelActor(RequestChannelConsumer requestChannelConsumer, int i, String str, int i2, int i3, long j, long j2) throws Exception {
        this.consumer = requestChannelConsumer;
        this.name = str;
        this.maxBufferPoolSize = i2;
        this.maxMessageSize = i3;
        this.probeTimeout = j;
        this.channel.socket().bind(new InetSocketAddress(i));
        this.channel.configureBlocking(false);
        this.channel.register(this.selector, 16);
        this.cancellable = stage().scheduler().schedule((Scheduled) selfAs(Scheduled.class), (Object) null, 100L, j2);
    }

    public void intervalSignal(Scheduled scheduled, Object obj) {
        probeChannel();
    }

    public void stop() {
        this.cancellable.cancel();
        close();
        super.stop();
    }

    @Override // io.vlingo.wire.channel.ResponseSenderChannel
    public void abandon(RequestResponseContext<?> requestResponseContext) {
        ((Context) requestResponseContext).close();
    }

    @Override // io.vlingo.wire.fdx.bidirectional.ServerRequestResponseChannel
    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        try {
            this.selector.close();
        } catch (Exception e) {
            logger().log("Failed to close selctor for: '" + this.name + "'", e);
        }
        try {
            this.channel.close();
        } catch (Exception e2) {
            logger().log("Failed to close channel for: '" + this.name + "'", e2);
        }
    }

    @Override // io.vlingo.wire.channel.ResponseSenderChannel
    public void respondWith(RequestResponseContext<?> requestResponseContext, ConsumerByteBuffer consumerByteBuffer) {
        ((Context) requestResponseContext).queueWritable(consumerByteBuffer);
    }

    private void accept(SelectionKey selectionKey) throws IOException {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
        if (serverSocketChannel.isOpen()) {
            SocketChannel accept = serverSocketChannel.accept();
            accept.configureBlocking(false);
            Selector selector = this.selector;
            long j = NextContextId;
            NextContextId = j + 1;
            accept.register(selector, 5, new Context(j, this.self, accept));
            logger().log("Accepted new connection for '" + this.name + "' from: " + accept.getRemoteAddress());
        }
    }

    private void probeChannel() {
        if (this.closed) {
            return;
        }
        try {
            if (this.selector.select(this.probeTimeout) > 0) {
                Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
                while (it.hasNext()) {
                    SelectionKey next = it.next();
                    it.remove();
                    if (next.isValid()) {
                        if (next.isAcceptable()) {
                            accept(next);
                        } else if (next.isReadable()) {
                            read(next);
                        } else if (next.isWritable()) {
                            write(next);
                        }
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            logger().log("Failed to accept/read/write/close client channel for '" + this.name + "' because: " + e.getMessage(), e);
        }
    }

    private void read(SelectionKey selectionKey) throws IOException {
        int read;
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        Context context = (Context) selectionKey.attachment();
        ConsumerByteBuffer clear = context.requestBuffer().clear();
        ByteBuffer asByteBuffer = clear.asByteBuffer();
        int i = 0;
        do {
            read = socketChannel.read(asByteBuffer);
            i += read;
        } while (read > 0);
        if (read == -1) {
            context.close();
            selectionKey.cancel();
        }
        if (i > 0) {
            this.consumer.consume(context, clear.flip());
        } else {
            clear.release();
        }
    }

    private void respondWithCachedData(Context context) throws Exception {
        SocketChannel reference = context.reference();
        ConsumerByteBuffer nextWritable = context.nextWritable();
        while (true) {
            ConsumerByteBuffer consumerByteBuffer = nextWritable;
            if (consumerByteBuffer == null) {
                return;
            }
            respondWithCachedData(context, reference, consumerByteBuffer);
            nextWritable = context.nextWritable();
        }
    }

    private void respondWithCachedData(Context context, SocketChannel socketChannel, ConsumerByteBuffer consumerByteBuffer) throws Exception {
        try {
            try {
                ByteBuffer asByteBuffer = consumerByteBuffer.asByteBuffer();
                while (asByteBuffer.hasRemaining()) {
                    socketChannel.write(asByteBuffer);
                }
                consumerByteBuffer.release();
            } catch (Exception e) {
                logger().log("Failed to write buffer for channel " + socketChannel.getRemoteAddress() + " because: " + e.getMessage(), e);
                consumerByteBuffer.release();
            }
        } catch (Throwable th) {
            consumerByteBuffer.release();
            throw th;
        }
    }

    private void write(SelectionKey selectionKey) throws Exception {
        Context context = (Context) selectionKey.attachment();
        if (context.hasNextWritable()) {
            respondWithCachedData(context);
        }
    }
}
