package io.reactivesocket.aeron.internal.reactivestreams;

import io.aeron.FragmentAssembler;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.logbuffer.FragmentHandler;
import io.aeron.logbuffer.Header;
import io.reactivesocket.aeron.internal.AeronWrapper;
import io.reactivesocket.aeron.internal.EventLoop;
import io.reactivesocket.aeron.internal.NotConnectedException;
import io.reactivesocket.aeron.internal.reactivestreams.ReactiveStreamsRemote;
import io.reactivesocket.aeron.internal.reactivestreams.messages.AckConnectEncoder;
import io.reactivesocket.aeron.internal.reactivestreams.messages.ConnectDecoder;
import io.reactivesocket.aeron.internal.reactivestreams.messages.MessageHeaderDecoder;
import io.reactivesocket.aeron.internal.reactivestreams.messages.MessageHeaderEncoder;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/reactivesocket/aeron/internal/reactivestreams/AeronChannelServer.class */
public class AeronChannelServer extends ReactiveStreamsRemote.ChannelServer<AeronChannelConsumer> {
    private static final Logger logger = LoggerFactory.getLogger(AeronChannelServer.class);
    private final AeronWrapper aeronWrapper;
    private final AeronSocketAddress managementSubscriptionSocket;
    private final AtomicBoolean started;
    private final ConcurrentHashMap<String, Subscription> serverSubscriptions;
    private volatile boolean running;
    private final EventLoop eventLoop;
    private Subscription managementSubscription;
    private AeronChannelStartedServer startServer;
    private final FragmentAssembler fragmentAssembler;

    /* loaded from: input_file:io/reactivesocket/aeron/internal/reactivestreams/AeronChannelServer$AeronChannelConsumer.class */
    public interface AeronChannelConsumer extends ReactiveStreamsRemote.ChannelConsumer<AeronChannel> {
    }

    /* loaded from: input_file:io/reactivesocket/aeron/internal/reactivestreams/AeronChannelServer$AeronChannelStartedServer.class */
    public class AeronChannelStartedServer implements ReactiveStreamsRemote.StartedServer {
        private CountDownLatch latch = new CountDownLatch(1);

        public AeronChannelStartedServer() {
        }

        public AeronWrapper getAeronWrapper() {
            return AeronChannelServer.this.aeronWrapper;
        }

        public EventLoop getEventLoop() {
            return AeronChannelServer.this.eventLoop;
        }

        @Override // io.reactivesocket.aeron.internal.reactivestreams.ReactiveStreamsRemote.StartedServer
        public SocketAddress getServerAddress() {
            return AeronChannelServer.this.managementSubscriptionSocket;
        }

        @Override // io.reactivesocket.aeron.internal.reactivestreams.ReactiveStreamsRemote.StartedServer
        public int getServerPort() {
            return AeronChannelServer.this.managementSubscriptionSocket.getPort();
        }

        @Override // io.reactivesocket.aeron.internal.reactivestreams.ReactiveStreamsRemote.StartedServer
        public void awaitShutdown(long j, TimeUnit timeUnit) {
            try {
                this.latch.await(j, timeUnit);
            } catch (InterruptedException e) {
                LangUtil.rethrowUnchecked(e);
            }
        }

        @Override // io.reactivesocket.aeron.internal.reactivestreams.ReactiveStreamsRemote.StartedServer
        public void awaitShutdown() {
            try {
                this.latch.await();
            } catch (InterruptedException e) {
                LangUtil.rethrowUnchecked(e);
            }
        }

        @Override // io.reactivesocket.aeron.internal.reactivestreams.ReactiveStreamsRemote.StartedServer
        public void shutdown() {
            AeronChannelServer.this.running = false;
            this.latch.countDown();
            AeronChannelServer.this.managementSubscription.close();
        }
    }

    private AeronChannelServer(AeronChannelConsumer aeronChannelConsumer, AeronWrapper aeronWrapper, AeronSocketAddress aeronSocketAddress, EventLoop eventLoop) {
        super(aeronChannelConsumer);
        this.started = new AtomicBoolean(false);
        this.running = true;
        this.fragmentAssembler = new FragmentAssembler(new FragmentHandler() { // from class: io.reactivesocket.aeron.internal.reactivestreams.AeronChannelServer.1
            private final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
            private final ConnectDecoder connectDecoder = new ConnectDecoder();
            private final MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder();
            private final AckConnectEncoder ackConnectEncoder = new AckConnectEncoder();

            public void onFragment(DirectBuffer directBuffer, int i, int i2, Header header) {
                long offer;
                this.messageHeaderDecoder.wrap(directBuffer, i);
                int blockLength = this.messageHeaderDecoder.blockLength();
                int templateId = this.messageHeaderDecoder.templateId();
                this.messageHeaderDecoder.schemaId();
                int version = this.messageHeaderDecoder.version();
                if (templateId == 1) {
                    this.connectDecoder.wrap(directBuffer, i + this.messageHeaderDecoder.encodedLength(), blockLength, version);
                    long channelId = this.connectDecoder.channelId();
                    String receivingChannel = this.connectDecoder.receivingChannel();
                    int receivingStreamId = this.connectDecoder.receivingStreamId();
                    String sendingChannel = this.connectDecoder.sendingChannel();
                    int sendingStreamId = this.connectDecoder.sendingStreamId();
                    int clientSessionId = this.connectDecoder.clientSessionId();
                    String clientManagementChannel = this.connectDecoder.clientManagementChannel();
                    AeronChannelServer.logger.debug("server creating a AeronChannel with channel id {} receiving on receivingChannel {}, receivingStreamId {}, sendingChannel {}, sendingStreamId {}", new Object[]{Long.valueOf(channelId), receivingChannel, Integer.valueOf(receivingStreamId), sendingChannel, Integer.valueOf(sendingStreamId)});
                    Publication addPublication = AeronChannelServer.this.aeronWrapper.addPublication(receivingChannel, receivingStreamId);
                    AeronChannelServer.logger.debug("server created publication to channel {}, stream id {}, and session id {}", new Object[]{receivingChannel, Integer.valueOf(receivingStreamId), Integer.valueOf(addPublication.sessionId())});
                    Subscription subscription = (Subscription) AeronChannelServer.this.serverSubscriptions.computeIfAbsent(sendingChannel, str -> {
                        return AeronChannelServer.this.aeronWrapper.addSubscription(sendingChannel, sendingStreamId);
                    });
                    AeronChannelServer.logger.debug("server created subscription to channel {}, stream id {}", sendingChannel, Integer.valueOf(sendingStreamId));
                    AeronChannel aeronChannel = new AeronChannel("server", addPublication, subscription, AeronChannelServer.this.eventLoop, clientSessionId);
                    AeronChannelServer.logger.debug("server create AeronChannel with destination channel {}, source channel {}, and clientSesseionId {}");
                    ((AeronChannelConsumer) AeronChannelServer.this.channelConsumer).accept(aeronChannel);
                    Publication addPublication2 = AeronChannelServer.this.aeronWrapper.addPublication(clientManagementChannel, 11);
                    AeronChannelServer.logger.debug("server created management publication to channel {}", clientManagementChannel);
                    MutableDirectBuffer unsafeBuffer = new UnsafeBuffer(ByteBuffer.allocateDirect(4096));
                    this.messageHeaderEncoder.wrap(unsafeBuffer, 0).blockLength(12).templateId(2).schemaId(1).version(0);
                    this.ackConnectEncoder.wrap(unsafeBuffer, 0 + this.messageHeaderEncoder.encodedLength()).channelId(channelId).serverSessionId(addPublication.sessionId());
                    AeronChannelServer.logger.debug("server sending AckConnect message to channel {}", clientManagementChannel);
                    do {
                        offer = addPublication2.offer(unsafeBuffer);
                        if (offer == -4) {
                            throw new NotConnectedException();
                        }
                    } while (offer < 0);
                }
            }
        });
        this.aeronWrapper = aeronWrapper;
        this.managementSubscriptionSocket = aeronSocketAddress;
        this.eventLoop = eventLoop;
        this.serverSubscriptions = new ConcurrentHashMap<>();
    }

    public static AeronChannelServer create(AeronChannelConsumer aeronChannelConsumer, AeronWrapper aeronWrapper, AeronSocketAddress aeronSocketAddress, EventLoop eventLoop) {
        return new AeronChannelServer(aeronChannelConsumer, aeronWrapper, aeronSocketAddress, eventLoop);
    }

    @Override // io.reactivesocket.aeron.internal.reactivestreams.ReactiveStreamsRemote.ChannelServer
    public AeronChannelStartedServer start() {
        if (!this.started.compareAndSet(false, true)) {
            throw new IllegalStateException("server already started");
        }
        logger.debug("management server starting on {}, stream id {}", this.managementSubscriptionSocket.getChannel(), 10);
        this.managementSubscription = this.aeronWrapper.addSubscription(this.managementSubscriptionSocket.getChannel(), 10);
        this.startServer = new AeronChannelStartedServer();
        poll();
        return this.startServer;
    }

    private int poll() {
        try {
            int poll = this.managementSubscription.poll(this.fragmentAssembler, 4096);
            if (!this.running || this.eventLoop.execute(this::poll)) {
                return poll;
            }
            this.running = false;
            throw new IllegalStateException("unable to keep polling, eventLoop rejection");
        } catch (Throwable th) {
            if (!this.running || this.eventLoop.execute(this::poll)) {
                throw th;
            }
            this.running = false;
            throw new IllegalStateException("unable to keep polling, eventLoop rejection");
        }
    }
}
