package org.cacheonix.impl.net.tcp;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import org.cacheonix.impl.clock.Clock;
import org.cacheonix.impl.config.SystemProperty;
import org.cacheonix.impl.net.processor.Frame;
import org.cacheonix.impl.net.processor.Message;
import org.cacheonix.impl.net.processor.ReceiverAddress;
import org.cacheonix.impl.net.processor.Request;
import org.cacheonix.impl.net.processor.Response;
import org.cacheonix.impl.net.processor.Router;
import org.cacheonix.impl.net.serializer.Serializer;
import org.cacheonix.impl.net.serializer.SerializerFactory;
import org.cacheonix.impl.util.IOUtils;
import org.cacheonix.impl.util.exception.ExceptionUtils;
import org.cacheonix.impl.util.logging.Logger;

/* loaded from: input_file:org/cacheonix/impl/net/tcp/SenderKeyHandler.class */
final class SenderKeyHandler extends KeyHandler {
    private static final Logger LOG = Logger.getLogger(SenderKeyHandler.class);
    private static final int INIT = 0;
    private static final int CONNECTING = 1;
    private static final int WRITING = 2;
    private final Serializer serializer;
    private final LinkedList<InetAddress> addressesToTry;
    private final LinkedList<Message> messages;
    private final ReceiverAddress receiverAddress;
    private final Router router;
    private int state;
    private ByteBuffer leftover;

    public SenderKeyHandler(Selector selector, ReceiverAddress receiverAddress, Router router, long j, Clock clock) {
        super(selector, j, clock);
        this.serializer = SerializerFactory.getInstance().getSerializer((byte) 1);
        this.addressesToTry = new LinkedList<>();
        this.messages = new LinkedList<>();
        this.state = 0;
        this.receiverAddress = receiverAddress;
        this.router = router;
    }

    private void handleFinishConnect(SelectionKey selectionKey) throws InterruptedException {
        SocketChannel socketChannel = socketChannel(selectionKey);
        switch (this.state) {
            case 1:
                try {
                    socketChannel.finishConnect();
                    socketChannel.socket().setSendBufferSize(SystemProperty.BUFFER_SIZE);
                    socketChannel.socket().setReceiveBufferSize(SystemProperty.BUFFER_SIZE);
                    selectionKey.interestOps(1);
                    this.state = 2;
                    handleWrite(selectionKey);
                    return;
                } catch (IOException e) {
                    beginConnecting(false);
                    return;
                }
            default:
                throw new IllegalStateException("handleFinishConnect() can only be called in CONNECTING state: " + this.state);
        }
    }

    public void handleWrite(SelectionKey selectionKey) throws InterruptedException {
        switch (this.state) {
            case 2:
                try {
                    if (this.leftover == null) {
                        writeMessages(selectionKey);
                    } else {
                        writeLeftover(selectionKey);
                    }
                    return;
                } catch (IOException e) {
                    IOUtils.closeHard(socketChannel(selectionKey));
                    this.leftover = null;
                    beginConnecting(true);
                    return;
                }
            default:
                throw new IllegalStateException("Illegal state: " + this.state);
        }
    }

    @Override // org.cacheonix.impl.net.tcp.KeyHandler
    public void handleIdle(SelectionKey selectionKey) throws InterruptedException {
        switch (this.state) {
            case 2:
                if (this.messages.isEmpty()) {
                    return;
                }
                handleWrite(selectionKey);
                return;
            default:
                return;
        }
    }

    @Override // org.cacheonix.impl.net.tcp.KeyHandler
    protected void handleTimeout(SelectionKey selectionKey) {
        IOUtils.closeHard(selectionKey);
        if (this.leftover != null) {
            this.leftover = null;
            respondWithFailure(this.messages.removeFirst(), "Operation timed out after " + getNetworkTimeoutMillis());
            beginConnecting(true);
        } else if (this.messages.isEmpty()) {
            this.state = 0;
        } else {
            beginConnecting(true);
        }
    }

    public void handleRead(SelectionKey selectionKey) {
        switch (this.state) {
            case 2:
                String str = null;
                try {
                    if (socketChannel(selectionKey).read(ByteBuffer.allocate(1)) == -1) {
                        str = "Connection was broken";
                    }
                } catch (IOException e) {
                    str = e.toString();
                }
                if (str != null) {
                    IOUtils.closeHard(socketChannel(selectionKey));
                    this.leftover = null;
                    this.state = 0;
                    respondToAllWithFailure(str);
                    return;
                }
                return;
            default:
                throw new IllegalStateException("Read readiness should not be received in this state " + this.state);
        }
    }

    public void enqueue(Message message) {
        this.messages.add(message);
        if (this.state == 0) {
            beginConnecting(true);
        }
    }

    @Override // org.cacheonix.impl.net.tcp.KeyHandler
    public void handleKey(SelectionKey selectionKey) throws InterruptedException {
        if (selectionKey.isConnectable()) {
            handleFinishConnect(selectionKey);
        } else if (selectionKey.isWritable()) {
            handleWrite(selectionKey);
        } else {
            if (!selectionKey.isReadable()) {
                throw new IllegalArgumentException("Key is not supported: " + selectionKey);
            }
            handleRead(selectionKey);
        }
    }

    private void writeLeftover(SelectionKey selectionKey) throws IOException, InterruptedException {
        SocketChannel socketChannel = socketChannel(selectionKey);
        if (write(socketChannel, this.leftover) == this.leftover.remaining()) {
            this.messages.removeFirst();
            this.leftover = null;
            selectionKey.interestOps(1);
            handleWrite(selectionKey);
        }
    }

    private void writeMessages(SelectionKey selectionKey) throws IOException {
        long j = 0;
        SocketChannel socketChannel = socketChannel(selectionKey);
        Iterator<Message> it = this.messages.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Message next = it.next();
            next.setTimestamp(this.clock.currentTime());
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(512);
            new Frame(Integer.MAX_VALUE, this.serializer, (byte) 1, 0L, next).write(byteArrayOutputStream);
            byteArrayOutputStream.flush();
            ByteBuffer wrap = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
            int remaining = wrap.remaining();
            int write = write(socketChannel, wrap);
            j += write;
            if (write < remaining) {
                selectionKey.interestOps(5);
                this.leftover = wrap;
                break;
            }
            it.remove();
        }
        if (j == 0) {
            registerInactivity(selectionKey);
        } else {
            registerActivity();
        }
    }

    private void respondToAllWithFailure(String str) {
        Iterator<Message> it = this.messages.iterator();
        while (it.hasNext()) {
            respondWithFailure(it.next(), str);
        }
        this.messages.clear();
    }

    private void respondWithFailure(Message message, String str) {
        Request request = Request.toRequest(message);
        if (request != null) {
            Response createResponse = request.createResponse(2, str);
            createResponse.setClusterUUID(request.getClusterUUID());
            this.router.route(createResponse);
        }
    }

    private void beginConnecting(boolean z) {
        if (z) {
            this.addressesToTry.clear();
            this.addressesToTry.addAll(Arrays.asList(this.receiverAddress.getAddresses()));
        }
        while (!this.addressesToTry.isEmpty()) {
            SocketChannel socketChannel = null;
            try {
                InetSocketAddress inetSocketAddress = new InetSocketAddress(this.addressesToTry.removeFirst(), this.receiverAddress.getTcpPort());
                socketChannel = SocketChannel.open();
                socketChannel.configureBlocking(false);
                socketChannel.register(selector(), 8, this);
                socketChannel.connect(inetSocketAddress);
                registerActivity();
                this.state = 1;
                return;
            } catch (IOException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("e: " + e, e);
                }
                IOUtils.closeHard(socketChannel);
            }
        }
        this.state = 0;
        respondToAllWithFailure("Cannot connect to receiver at " + this.receiverAddress);
    }

    private static int write(SocketChannel socketChannel, ByteBuffer byteBuffer) throws IOException {
        try {
            return socketChannel.write(byteBuffer);
        } catch (IOException e) {
            throw ExceptionUtils.enhanceExceptionWithAddress(socketChannel, e);
        }
    }

    @Override // org.cacheonix.impl.net.tcp.KeyHandler
    public String toString() {
        return "SenderKeyHandler{receiverNodeAddress=" + this.receiverAddress + ", messages=" + this.messages.size() + ", addressesToTry=" + this.addressesToTry + ", state=" + this.state + ", leftover=" + this.leftover + '}';
    }
}
