package io.reactivesocket.aeron.internal.reactivestreams;

import io.aeron.FragmentAssembler;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.logbuffer.FragmentHandler;
import io.aeron.logbuffer.Header;
import io.reactivesocket.aeron.internal.AeronWrapper;
import io.reactivesocket.aeron.internal.EventLoop;
import io.reactivesocket.aeron.internal.NotConnectedException;
import io.reactivesocket.aeron.internal.reactivestreams.ReactiveStreamsRemote;
import io.reactivesocket.aeron.internal.reactivestreams.messages.AckConnectDecoder;
import io.reactivesocket.aeron.internal.reactivestreams.messages.ConnectEncoder;
import io.reactivesocket.aeron.internal.reactivestreams.messages.MessageHeaderDecoder;
import io.reactivesocket.aeron.internal.reactivestreams.messages.MessageHeaderEncoder;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntConsumer;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSource;
import reactor.core.publisher.Operators;

/* loaded from: input_file:io/reactivesocket/aeron/internal/reactivestreams/AeronClientChannelConnector.class */
public class AeronClientChannelConnector implements ReactiveStreamsRemote.ClientChannelConnector<AeronClientConfig, AeronChannel>, AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(AeronClientChannelConnector.class);
    private static final AtomicLong CHANNEL_ID_COUNTER = new AtomicLong();
    private final AeronWrapper aeronWrapper;
    private final ConcurrentHashMap<AeronSocketAddress, Subscription> clientSubscriptions;
    private final ConcurrentHashMap<Long, IntConsumer> serverSessionIdConsumerMap;
    private final Subscription managementSubscription;
    private final EventLoop eventLoop;
    private volatile boolean running = true;
    private final FragmentAssembler fragmentAssembler = new FragmentAssembler(new FragmentHandler() { // from class: io.reactivesocket.aeron.internal.reactivestreams.AeronClientChannelConnector.1
        private final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
        private final AckConnectDecoder ackConnectDecoder = new AckConnectDecoder();

        public void onFragment(DirectBuffer directBuffer, int i, int i2, Header header) {
            this.messageHeaderDecoder.wrap(directBuffer, i);
            int blockLength = this.messageHeaderDecoder.blockLength();
            int templateId = this.messageHeaderDecoder.templateId();
            this.messageHeaderDecoder.schemaId();
            int version = this.messageHeaderDecoder.version();
            if (templateId != 2) {
                throw new IllegalStateException("received unknown template id " + templateId);
            }
            AeronClientChannelConnector.logger.debug("client received an ack message on session id {}", Integer.valueOf(header.sessionId()));
            this.ackConnectDecoder.wrap(directBuffer, i + this.messageHeaderDecoder.encodedLength(), blockLength, version);
            long channelId = this.ackConnectDecoder.channelId();
            int serverSessionId = this.ackConnectDecoder.serverSessionId();
            AeronClientChannelConnector.logger.debug("client received ack message for channel id {} and server session id {}", Long.valueOf(channelId), Integer.valueOf(serverSessionId));
            IntConsumer intConsumer = (IntConsumer) AeronClientChannelConnector.this.serverSessionIdConsumerMap.remove(Long.valueOf(channelId));
            if (intConsumer == null) {
                throw new IllegalStateException("no channel found for channel id " + channelId);
            }
            intConsumer.accept(serverSessionId);
        }
    });

    /* loaded from: input_file:io/reactivesocket/aeron/internal/reactivestreams/AeronClientChannelConnector$AeronClientConfig.class */
    public static class AeronClientConfig implements ReactiveStreamsRemote.ClientChannelConfig {
        private final AeronSocketAddress receiveSocketAddress;
        private final AeronSocketAddress sendSocketAddress;
        private final int receiveStreamId;
        private final int sendStreamId;
        private final EventLoop eventLoop;

        private AeronClientConfig(AeronSocketAddress aeronSocketAddress, AeronSocketAddress aeronSocketAddress2, int i, int i2, EventLoop eventLoop) {
            this.receiveSocketAddress = aeronSocketAddress;
            this.sendSocketAddress = aeronSocketAddress2;
            this.receiveStreamId = i;
            this.sendStreamId = i2;
            this.eventLoop = eventLoop;
        }

        public static AeronClientConfig create(AeronSocketAddress aeronSocketAddress, AeronSocketAddress aeronSocketAddress2, int i, int i2, EventLoop eventLoop) {
            return new AeronClientConfig(aeronSocketAddress, aeronSocketAddress2, i, i2, eventLoop);
        }

        public String toString() {
            return "AeronClientConfig{receiveSocketAddress=" + this.receiveSocketAddress + ", sendSocketAddress=" + this.sendSocketAddress + ", receiveStreamId=" + this.receiveStreamId + ", sendStreamId=" + this.sendStreamId + ", eventLoop=" + this.eventLoop + '}';
        }
    }

    private AeronClientChannelConnector(AeronWrapper aeronWrapper, AeronSocketAddress aeronSocketAddress, EventLoop eventLoop) {
        this.aeronWrapper = aeronWrapper;
        logger.debug("client creating a management subscription on channel {}, stream id {}", aeronSocketAddress.getChannel(), 11);
        this.managementSubscription = aeronWrapper.addSubscription(aeronSocketAddress.getChannel(), 11);
        this.eventLoop = eventLoop;
        this.clientSubscriptions = new ConcurrentHashMap<>();
        this.serverSessionIdConsumerMap = new ConcurrentHashMap<>();
        poll();
    }

    public static AeronClientChannelConnector create(AeronWrapper aeronWrapper, AeronSocketAddress aeronSocketAddress, EventLoop eventLoop) {
        return new AeronClientChannelConnector(aeronWrapper, aeronSocketAddress, eventLoop);
    }

    private int poll() {
        try {
            int poll = this.managementSubscription.poll(this.fragmentAssembler, 4096);
            if (!this.running || this.eventLoop.execute(this::poll)) {
                return poll;
            }
            this.running = false;
            throw new IllegalStateException("unable to keep polling, eventLoop rejection");
        } catch (Throwable th) {
            if (!this.running || this.eventLoop.execute(this::poll)) {
                throw th;
            }
            this.running = false;
            throw new IllegalStateException("unable to keep polling, eventLoop rejection");
        }
    }

    @Override // java.util.function.Function
    public Mono<AeronChannel> apply(AeronClientConfig aeronClientConfig) {
        return MonoSource.wrap(subscriber -> {
            long offer;
            subscriber.onSubscribe(Operators.emptySubscription());
            long j = CHANNEL_ID_COUNTER.get();
            try {
                logger.debug("Creating new client channel with id {}", Long.valueOf(j));
                Publication addPublication = this.aeronWrapper.addPublication(aeronClientConfig.sendSocketAddress.getChannel(), aeronClientConfig.sendStreamId);
                int streamId = addPublication.streamId();
                logger.debug("Client created publication to {}, on stream id {}, and session id {}", new Object[]{aeronClientConfig.sendSocketAddress, Integer.valueOf(aeronClientConfig.sendStreamId), Integer.valueOf(addPublication.sessionId())});
                Subscription computeIfAbsent = this.clientSubscriptions.computeIfAbsent(aeronClientConfig.receiveSocketAddress, aeronSocketAddress -> {
                    Subscription addSubscription = this.aeronWrapper.addSubscription(aeronClientConfig.receiveSocketAddress.getChannel(), aeronClientConfig.receiveStreamId);
                    logger.debug("Client created subscription to {}, on stream id {}", aeronClientConfig.receiveSocketAddress, Integer.valueOf(aeronClientConfig.receiveStreamId));
                    return addSubscription;
                });
                this.serverSessionIdConsumerMap.putIfAbsent(Long.valueOf(j), i -> {
                    try {
                        AeronChannel aeronChannel = new AeronChannel("client", addPublication, computeIfAbsent, aeronClientConfig.eventLoop, i);
                        logger.debug("created client AeronChannel for destination {}, source {}, destination stream id {}, source stream id {}, client session id, and server session id {}", new Object[]{aeronClientConfig.sendSocketAddress, aeronClientConfig.receiveSocketAddress, Integer.valueOf(addPublication.streamId()), Integer.valueOf(computeIfAbsent.streamId()), Integer.valueOf(addPublication.sessionId()), Integer.valueOf(i)});
                        subscriber.onNext(aeronChannel);
                        subscriber.onComplete();
                    } catch (Throwable th) {
                        subscriber.onError(th);
                    }
                });
                this.aeronWrapper.unavailableImageHandlers(image -> {
                    if (streamId != image.sessionId()) {
                        return false;
                    }
                    this.clientSubscriptions.remove(Long.valueOf(j));
                    return true;
                });
                Publication addPublication2 = this.aeronWrapper.addPublication(aeronClientConfig.sendSocketAddress.getChannel(), 10);
                logger.debug("Client created management publication to channel {}, stream id {}", addPublication2.channel(), Integer.valueOf(addPublication2.streamId()));
                DirectBuffer encodeConnectMessage = encodeConnectMessage(j, aeronClientConfig, addPublication.sessionId());
                do {
                    offer = addPublication2.offer(encodeConnectMessage);
                    if (offer == -4) {
                        subscriber.onError(new NotConnectedException());
                    }
                } while (offer < 0);
                logger.debug("Client sent create message to {}", addPublication2.channel());
            } catch (Throwable th) {
                logger.error("Error creating a channel to {}", aeronClientConfig);
                this.clientSubscriptions.remove(Long.valueOf(j));
                subscriber.onError(th);
            }
        });
    }

    public DirectBuffer encodeConnectMessage(long j, AeronClientConfig aeronClientConfig, int i) {
        MutableDirectBuffer unsafeBuffer = new UnsafeBuffer(ByteBuffer.allocateDirect(4096));
        MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder();
        messageHeaderEncoder.wrap(unsafeBuffer, 0).blockLength(20).templateId(1).schemaId(1).version(0);
        new ConnectEncoder().wrap(unsafeBuffer, 0 + messageHeaderEncoder.encodedLength()).channelId(j).receivingChannel(aeronClientConfig.receiveSocketAddress.getChannel()).receivingStreamId(aeronClientConfig.receiveStreamId).sendingChannel(aeronClientConfig.sendSocketAddress.getChannel()).sendingStreamId(aeronClientConfig.sendStreamId).clientSessionId(i).clientManagementChannel(this.managementSubscription.channel());
        return unsafeBuffer;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.running = false;
    }
}
