package reactor.aeron.server;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.aeron.AeronInbound;
import reactor.aeron.AeronOptions;
import reactor.aeron.AeronOutbound;
import reactor.aeron.AeronResources;
import reactor.aeron.AeronUtils;
import reactor.aeron.Connection;
import reactor.aeron.ControlMessageSubscriber;
import reactor.aeron.DefaultAeronInbound;
import reactor.aeron.DefaultAeronOutbound;
import reactor.aeron.MessagePublication;
import reactor.aeron.MessageType;
import reactor.aeron.OnDisposable;
import reactor.aeron.Protocol;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/aeron/server/AeronServerHandler.class */
public final class AeronServerHandler implements ControlMessageSubscriber, OnDisposable {
    private static final Logger logger = Loggers.getLogger(AeronServerHandler.class);
    private static final AtomicInteger streamIdCounter = new AtomicInteger(1000);
    private final String category;
    private final AeronOptions options;
    private final AeronResources resources;
    private final Function<? super Connection, ? extends Publisher<Void>> handler;
    private final String serverChannel;
    private final AtomicLong nextSessionId = new AtomicLong(0);
    private final List<SessionHandler> handlers = new CopyOnWriteArrayList();
    private final MonoProcessor<Void> dispose = MonoProcessor.create();
    private final MonoProcessor<Void> onDispose = MonoProcessor.create();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/aeron/server/AeronServerHandler$SessionHandler.class */
    public class SessionHandler implements Connection {
        private final Logger logger;
        private final DefaultAeronOutbound outbound;
        private final DefaultAeronInbound inbound;
        private final String clientChannel;
        private final int clientSessionStreamId;
        private final int serverSessionStreamId;
        private final UUID connectRequestId;
        private final long sessionId;
        private final Mono<MessagePublication> controlPublication;
        private final MonoProcessor<Void> dispose;
        private final MonoProcessor<Void> onDispose;

        private SessionHandler(String str, int i, int i2, UUID uuid, long j, int i3) {
            this.logger = Loggers.getLogger(SessionHandler.class);
            this.dispose = MonoProcessor.create();
            this.onDispose = MonoProcessor.create();
            this.clientSessionStreamId = i;
            this.clientChannel = str;
            this.outbound = new DefaultAeronOutbound(AeronServerHandler.this.category, str, AeronServerHandler.this.resources, AeronServerHandler.this.options);
            this.connectRequestId = uuid;
            this.sessionId = j;
            this.serverSessionStreamId = i3;
            this.inbound = new DefaultAeronInbound(AeronServerHandler.this.category, AeronServerHandler.this.resources);
            this.controlPublication = Mono.defer(() -> {
                return newControlPublication(str, i2);
            }).cache();
            this.dispose.then(doDispose()).doFinally(signalType -> {
                this.onDispose.onComplete();
            }).subscribe((Consumer) null, th -> {
                this.logger.warn("SessionHandler disposed with error: {}", th);
            });
        }

        private Mono<MessagePublication> newControlPublication(String str, int i) {
            return AeronServerHandler.this.resources.messagePublication(AeronServerHandler.this.category, str, i, AeronServerHandler.this.options, AeronServerHandler.this.resources.nextEventLoop());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Mono<? extends Connection> start() {
            return connect().then(this.outbound.start(this.sessionId, this.clientSessionStreamId)).then(this.inbound.start(AeronServerHandler.this.serverChannel, this.serverSessionStreamId, this.sessionId, this::dispose)).thenReturn(this).doOnSuccess(sessionHandler -> {
                AeronServerHandler.this.handlers.add(this);
                this.logger.debug("[{}] Client with connectRequestId: {} successfully connected, sessionId: {}", new Object[]{AeronServerHandler.this.category, this.connectRequestId, Long.valueOf(this.sessionId)});
            }).doOnError(th -> {
                this.logger.debug("[{}] Failed to connect to the client for sessionId: {}", new Object[]{AeronServerHandler.this.category, Long.valueOf(this.sessionId), th});
                dispose();
            });
        }

        @Override // reactor.aeron.Connection
        public AeronInbound inbound() {
            return this.inbound;
        }

        @Override // reactor.aeron.Connection
        public AeronOutbound outbound() {
            return this.outbound;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("ServerSession{");
            sb.append("sessionId=").append(this.sessionId);
            sb.append(", clientChannel=").append(this.clientChannel);
            sb.append(", clientSessionStreamId=").append(this.clientSessionStreamId);
            sb.append(", serverSessionStreamId=").append(this.serverSessionStreamId);
            sb.append(", connectRequestId=").append(this.connectRequestId);
            sb.append('}');
            return sb.toString();
        }

        public void dispose() {
            this.dispose.onComplete();
        }

        public boolean isDisposed() {
            return this.onDispose.isDisposed();
        }

        @Override // reactor.aeron.OnDisposable
        public Mono<Void> onDispose() {
            return this.onDispose;
        }

        private Mono<Void> doDispose() {
            return Mono.defer(() -> {
                this.logger.debug("[{}] About to close session with sessionId: {}", new Object[]{AeronServerHandler.this.category, Long.valueOf(this.sessionId)});
                AeronServerHandler.this.handlers.remove(this);
                Optional.ofNullable(this.outbound).ifPresent((v0) -> {
                    v0.dispose();
                });
                Optional.ofNullable(this.inbound).ifPresent((v0) -> {
                    v0.dispose();
                });
                return Mono.whenDelayError(new Publisher[]{(Publisher) Optional.ofNullable(this.outbound).map((v0) -> {
                    return v0.onDispose();
                }).orElse(Mono.empty()), (Publisher) Optional.ofNullable(this.inbound).map((v0) -> {
                    return v0.onDispose();
                }).orElse(Mono.empty())}).doFinally(signalType -> {
                    this.logger.debug("[{}] Closed session with sessionId: {}", new Object[]{AeronServerHandler.this.category, Long.valueOf(this.sessionId)});
                });
            });
        }

        private Mono<Void> connect() {
            return Mono.defer(() -> {
                Duration ofMillis = Duration.ofMillis(100L);
                Duration plus = AeronServerHandler.this.options.connectTimeout().plus(AeronServerHandler.this.options.backpressureTimeout());
                long millis = plus.toMillis() / ofMillis.toMillis();
                return this.controlPublication.flatMap(messagePublication -> {
                    return sendConnectAck(messagePublication).retryBackoff(millis, ofMillis, ofMillis).timeout(plus).then().doOnSuccess(r9 -> {
                        this.logger.debug("[{}] Sent {} to {}", new Object[]{AeronServerHandler.this.category, MessageType.CONNECT_ACK, messagePublication});
                    }).onErrorResume(th -> {
                        return Mono.error(new RuntimeException(String.format("Failed to send %s, publication %s is not connected", MessageType.CONNECT_ACK, messagePublication), th));
                    });
                });
            });
        }

        private Mono<Void> sendConnectAck(MessagePublication messagePublication) {
            return messagePublication.enqueue(MessageType.CONNECT_ACK, Protocol.createConnectAckBody(this.connectRequestId, this.serverSessionStreamId), this.sessionId);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AeronServerHandler(AeronServerSettings aeronServerSettings) {
        this.category = aeronServerSettings.name();
        this.options = aeronServerSettings.options();
        this.resources = aeronServerSettings.aeronResources();
        this.handler = aeronServerSettings.handler();
        this.serverChannel = this.options.serverChannel();
        this.dispose.then(doDispose()).doFinally(signalType -> {
            this.onDispose.onComplete();
        }).subscribe((Consumer) null, th -> {
            logger.warn("AeronServerHandler disposed with error: {}", th);
        });
    }

    @Override // reactor.aeron.ControlMessageSubscriber
    public void onSubscription(Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

    @Override // reactor.aeron.ControlMessageSubscriber
    public void onConnect(UUID uuid, String str, int i, int i2) {
        if (logger.isDebugEnabled()) {
            logger.debug("[{}] Received {} for connectRequestId: {}, channel={}, clientControlStreamId={}, clientSessionStreamId={}", new Object[]{this.category, MessageType.CONNECT, uuid, AeronUtils.minifyChannel(str), Integer.valueOf(i), Integer.valueOf(i2)});
        }
        int incrementAndGet = streamIdCounter.incrementAndGet();
        long incrementAndGet2 = this.nextSessionId.incrementAndGet();
        new SessionHandler(str, i2, i, uuid, incrementAndGet2, incrementAndGet).start().subscribeOn(Schedulers.single()).subscribe(connection -> {
            this.handler.apply(connection).subscribe(connection.disposeSubscriber());
        }, th -> {
            logger.error("[{}] Occurred exception on connect to {}, sessionId: {}, connectRequestId: {}, clientSessionStreamId: {}, clientControlStreamId: {}, serverSessionStreamId: {}, error: ", new Object[]{this.category, str, Long.valueOf(incrementAndGet2), uuid, Integer.valueOf(i2), Integer.valueOf(i), Integer.valueOf(incrementAndGet), th});
        });
    }

    @Override // reactor.aeron.ControlMessageSubscriber
    public void onConnectAck(UUID uuid, long j, int i) {
        logger.error("[{}] Received unsupported server request {}, connectRequestId: {}", new Object[]{this.category, MessageType.CONNECT_ACK, uuid});
    }

    @Override // reactor.aeron.ControlMessageSubscriber
    public void onComplete(long j) {
        logger.info("[{}] Received {} for sessionId: {}", new Object[]{this.category, MessageType.COMPLETE, Long.valueOf(j)});
        this.handlers.stream().filter(sessionHandler -> {
            return sessionHandler.sessionId == j;
        }).findFirst().ifPresent((v0) -> {
            v0.dispose();
        });
    }

    public void dispose() {
        this.dispose.onComplete();
    }

    public boolean isDisposed() {
        return this.onDispose.isDisposed();
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    private Mono<Void> doDispose() {
        return Mono.defer(() -> {
            return Mono.whenDelayError((Iterable) this.handlers.stream().map(sessionHandler -> {
                sessionHandler.dispose();
                return sessionHandler.onDispose();
            }).collect(Collectors.toList()));
        });
    }
}
