package org.reaktivity.nukleus.tcp.internal.connector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import org.agrona.LangUtil;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.nio.TransportPoller;
import org.reaktivity.nukleus.Nukleus;
import org.reaktivity.nukleus.Reaktive;
import org.reaktivity.nukleus.tcp.internal.Context;
import org.reaktivity.nukleus.tcp.internal.router.Router;

@Reaktive
/* loaded from: input_file:org/reaktivity/nukleus/tcp/internal/connector/Connector.class */
public final class Connector extends TransportPoller implements Nukleus {
    private final Context context;
    private Router router;

    public Connector(Context context) {
        this.context = context;
    }

    public void setRouter(Router router) {
        this.router = router;
    }

    public int process() {
        selectNow();
        return this.selectedKeySet.forEach(this::processConnect);
    }

    public String name() {
        return "connector";
    }

    public void doConnect(String str, long j, long j2, long j3, String str2, long j4, SocketChannel socketChannel, InetSocketAddress inetSocketAddress) {
        Request request = new Request(str, j, j2, str2, j4, j3, socketChannel, inetSocketAddress);
        try {
            if (socketChannel.connect(inetSocketAddress)) {
                handleConnected(request);
            } else {
                socketChannel.register(this.selector, 8, request);
            }
        } catch (IOException e) {
            handleConnectFailed(request);
            LangUtil.rethrowUnchecked(e);
        }
    }

    private void selectNow() {
        try {
            this.selector.selectNow();
        } catch (IOException e) {
            LangUtil.rethrowUnchecked(e);
        }
    }

    private int processConnect(SelectionKey selectionKey) {
        Request request = (Request) selectionKey.attachment();
        try {
            request.channel().finishConnect();
            handleConnected(request);
            return 1;
        } catch (Exception e) {
            handleConnectFailed(request);
            LangUtil.rethrowUnchecked(e);
            return 1;
        }
    }

    private void handleConnected(Request request) {
        AtomicCounter streamsSourced = this.context.counters().streamsSourced();
        String sourceName = request.sourceName();
        long sourceRef = request.sourceRef();
        long sourceId = request.sourceId();
        String targetName = request.targetName();
        long targetRef = request.targetRef();
        this.router.onConnected(sourceName, sourceRef, sourceId, targetName, streamsSourced.increment(), targetRef, request.correlationId(), request.channel(), request.address());
    }

    private void handleConnectFailed(Request request) {
        this.router.onConnectFailed(request.sourceName(), request.sourceId());
    }
}
