package io.rsocket.aeron.server;

import io.rsocket.Closeable;
import io.rsocket.aeron.AeronDuplexConnection;
import io.rsocket.aeron.internal.AeronWrapper;
import io.rsocket.aeron.internal.EventLoop;
import io.rsocket.aeron.internal.reactivestreams.AeronChannelServer;
import io.rsocket.aeron.internal.reactivestreams.AeronSocketAddress;
import io.rsocket.transport.ServerTransport;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/rsocket/aeron/server/AeronServerTransport.class */
public class AeronServerTransport implements ServerTransport<Closeable> {
    private final AeronWrapper aeronWrapper;
    private final AeronSocketAddress managementSubscriptionSocket;
    private final EventLoop eventLoop;
    private AeronChannelServer aeronChannelServer;

    public AeronServerTransport(AeronWrapper aeronWrapper, AeronSocketAddress aeronSocketAddress, EventLoop eventLoop) {
        this.aeronWrapper = aeronWrapper;
        this.managementSubscriptionSocket = aeronSocketAddress;
        this.eventLoop = eventLoop;
    }

    public Mono<Closeable> start(ServerTransport.ConnectionAcceptor connectionAcceptor) {
        synchronized (this) {
            if (this.aeronChannelServer != null) {
                throw new IllegalStateException("server already ready started");
            }
            this.aeronChannelServer = AeronChannelServer.create(aeronChannel -> {
                connectionAcceptor.apply(new AeronDuplexConnection("server", aeronChannel)).subscribe();
            }, this.aeronWrapper, this.managementSubscriptionSocket, this.eventLoop);
        }
        return Mono.just(this.aeronChannelServer.start());
    }
}
