package ch.squaredesk.nova.comm.jms;

import ch.squaredesk.nova.comm.retrieving.IncomingMessage;
import ch.squaredesk.nova.comm.rpc.RpcServer;
import ch.squaredesk.nova.metrics.Metrics;
import io.reactivex.Flowable;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.jms.Destination;

/* loaded from: input_file:ch/squaredesk/nova/comm/jms/JmsRpcServer.class */
public class JmsRpcServer<InternalMessageType> extends RpcServer<Destination, JmsRpcInvocation<InternalMessageType>> {
    private final JmsMessageSender<InternalMessageType> messageSender;
    private final JmsMessageReceiver<InternalMessageType> messageReceiver;
    private final Function<Throwable, InternalMessageType> errorReplyFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    public JmsRpcServer(String str, JmsMessageReceiver<InternalMessageType> jmsMessageReceiver, JmsMessageSender<InternalMessageType> jmsMessageSender, Function<Throwable, InternalMessageType> function, Metrics metrics) {
        super(str, metrics);
        Objects.requireNonNull(jmsMessageSender, "messageSender must not be null");
        Objects.requireNonNull(jmsMessageReceiver, "messageReceiver must not be null");
        Objects.requireNonNull(function, "errorReplyFactory must not be null");
        this.messageSender = jmsMessageSender;
        this.messageReceiver = jmsMessageReceiver;
        this.errorReplyFactory = function;
    }

    public Flowable<JmsRpcInvocation<InternalMessageType>> requests(Destination destination) {
        return this.messageReceiver.messages(destination).filter(this::isRpcRequest).map(incomingMessage -> {
            this.metricsCollector.requestReceived(incomingMessage.message);
            Object obj = incomingMessage.message;
            Consumer<ReplyType> createReplyHandlerFor = createReplyHandlerFor(incomingMessage);
            Consumer<Throwable> createErrorReplyHandlerFor = createErrorReplyHandlerFor(incomingMessage);
            return new JmsRpcInvocation(obj, (JmsSpecificInfo) incomingMessage.details.transportSpecificDetails, pair -> {
                createReplyHandlerFor.accept(pair._1);
                this.metricsCollector.requestCompleted(incomingMessage.message, pair);
            }, th -> {
                this.metricsCollector.requestCompletedExceptionally(incomingMessage.message, th);
                createErrorReplyHandlerFor.accept(th);
            });
        });
    }

    private boolean isRpcRequest(IncomingMessage<InternalMessageType, Destination, JmsSpecificInfo> incomingMessage) {
        return (incomingMessage.details == null || incomingMessage.details.transportSpecificDetails == null || ((JmsSpecificInfo) incomingMessage.details.transportSpecificDetails).replyDestination == null || ((JmsSpecificInfo) incomingMessage.details.transportSpecificDetails).correlationId == null) ? false : true;
    }

    private <RequestType extends InternalMessageType, ReplyType extends InternalMessageType> Consumer<ReplyType> createReplyHandlerFor(IncomingMessage<RequestType, Destination, JmsSpecificInfo> incomingMessage) {
        JmsSpecificInfo jmsSpecificInfo = new JmsSpecificInfo(((JmsSpecificInfo) incomingMessage.details.transportSpecificDetails).correlationId, null, null, null, null, null);
        return obj -> {
            this.messageSender.sendMessage(((JmsSpecificInfo) incomingMessage.details.transportSpecificDetails).replyDestination, obj, jmsSpecificInfo);
        };
    }

    private Consumer<Throwable> createErrorReplyHandlerFor(IncomingMessage<InternalMessageType, Destination, JmsSpecificInfo> incomingMessage) {
        JmsSpecificInfo jmsSpecificInfo = new JmsSpecificInfo(((JmsSpecificInfo) incomingMessage.details.transportSpecificDetails).correlationId, null, null, null, null, null);
        return th -> {
            this.messageSender.sendMessage(((JmsSpecificInfo) incomingMessage.details.transportSpecificDetails).replyDestination, this.errorReplyFactory.apply(th), jmsSpecificInfo);
        };
    }
}
