package io.reactivesocket.transport.local;

import io.reactivesocket.DuplexConnection;
import io.reactivesocket.Frame;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:io/reactivesocket/transport/local/LocalDuplexConnection.class */
public class LocalDuplexConnection implements DuplexConnection {
    private final Flux<Frame> in;
    private final Subscriber<Frame> out;
    private final MonoProcessor<Void> closeNotifier;

    public LocalDuplexConnection(Flux<Frame> flux, Subscriber<Frame> subscriber, MonoProcessor<Void> monoProcessor) {
        this.in = flux;
        this.out = subscriber;
        this.closeNotifier = monoProcessor;
    }

    public Mono<Void> send(Publisher<Frame> publisher) {
        return Flux.from(publisher).concatMap(this::sendOne).then();
    }

    public Mono<Void> sendOne(Frame frame) {
        return Mono.fromRunnable(() -> {
            this.out.onNext(frame);
        });
    }

    public Flux<Frame> receive() {
        return this.in;
    }

    public Mono<Void> close() {
        return Mono.defer(() -> {
            this.out.onComplete();
            this.closeNotifier.onComplete();
            return this.closeNotifier;
        });
    }

    public Mono<Void> onClose() {
        return this.closeNotifier;
    }

    public double availability() {
        return this.closeNotifier.isDisposed() ? 0.0d : 1.0d;
    }
}
