/*
 * Decompiled with CFR 0.152.
 */
package org.reactivecommons.async.impl.reply;

import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import org.reactivecommons.async.impl.communications.Message;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.util.concurrent.Queues;

public class ReactiveReplyRouter {
    private final ConcurrentHashMap<String, UnicastProcessor<Message>> processors = new ConcurrentHashMap();

    public Mono<Message> register(String correlationID) {
        UnicastProcessor processor = UnicastProcessor.create((Queue)((Queue)Queues.one().get()));
        this.processors.put(correlationID, (UnicastProcessor<Message>)processor);
        return processor.singleOrEmpty();
    }

    public void routeReply(String correlationID, Message data) {
        UnicastProcessor<Message> processor = this.processors.remove(correlationID);
        if (processor != null) {
            processor.onNext((Object)data);
            processor.onComplete();
        }
    }

    public <E> void routeError(String correlationID, String data) {
        UnicastProcessor<Message> processor = this.processors.remove(correlationID);
        if (processor != null) {
            processor.onError((Throwable)new RuntimeException(data));
        }
    }

    public void routeEmpty(String correlationID) {
        UnicastProcessor<Message> processor = this.processors.remove(correlationID);
        if (processor != null) {
            processor.onComplete();
        }
    }
}

