package org.reactivecommons.async.impl.reply;

import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import org.reactivecommons.async.impl.communications.Message;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.util.concurrent.Queues;

/* loaded from: input_file:org/reactivecommons/async/impl/reply/ReactiveReplyRouter.class */
public class ReactiveReplyRouter {
    private final ConcurrentHashMap<String, UnicastProcessor<Message>> processors = new ConcurrentHashMap<>();

    public Mono<Message> register(String str) {
        UnicastProcessor<Message> create = UnicastProcessor.create((Queue) Queues.one().get());
        this.processors.put(str, create);
        return create.singleOrEmpty();
    }

    public void routeReply(String str, Message message) {
        UnicastProcessor<Message> remove = this.processors.remove(str);
        if (remove != null) {
            remove.onNext(message);
            remove.onComplete();
        }
    }

    public <E> void routeError(String str, String str2) {
        UnicastProcessor<Message> remove = this.processors.remove(str);
        if (remove != null) {
            remove.onError(new RuntimeException(str2));
        }
    }

    public void routeEmpty(String str) {
        UnicastProcessor<Message> remove = this.processors.remove(str);
        if (remove != null) {
            remove.onComplete();
        }
    }
}
