package io.hoplin.rpc;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import io.hoplin.HoplinRuntimeException;
import io.hoplin.MessagePayload;
import io.hoplin.json.JsonCodec;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hoplin/rpc/RpcResponderConsumer.class */
public class RpcResponderConsumer<I, O> extends DefaultConsumer {
    private static final Logger log = LoggerFactory.getLogger(RpcResponderConsumer.class);
    private final Executor executor;
    private final Function<I, O> handler;
    private JsonCodec codec;

    public RpcResponderConsumer(Channel channel, Function<I, O> function, Executor executor) {
        super(channel);
        this.executor = (Executor) Objects.requireNonNull(executor);
        this.handler = (Function) Objects.requireNonNull(function);
        this.codec = new JsonCodec();
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
        log.info("handleDelivery : {}", envelope);
        CompletableFuture.supplyAsync(() -> {
            return dispatch(bArr);
        }, this.executor).whenComplete((bArr2, th) -> {
            try {
                if (th != null) {
                    nack(envelope);
                    return;
                }
                AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().correlationId(basicProperties.getCorrelationId()).build();
                String replyTo = basicProperties.getReplyTo();
                log.info("replyTo, correlationId :  {}, {}", replyTo, basicProperties.getCorrelationId());
                getChannel().basicPublish("", replyTo, build, bArr2);
                getChannel().basicAck(envelope.getDeliveryTag(), false);
            } catch (Exception e) {
                log.error("Unable to acknowledgeExceptionally execution", e);
                nack(envelope);
            }
        });
    }

    private void nack(Envelope envelope) {
        try {
            long deliveryTag = envelope.getDeliveryTag();
            if (envelope.isRedeliver()) {
                sendToDeadMessageQueue(envelope);
            } else {
                getChannel().basicNack(deliveryTag, false, true);
            }
        } catch (IOException e) {
            log.error("unable to NACK : " + envelope, e);
        }
    }

    private void sendToDeadMessageQueue(Envelope envelope) {
        log.warn("marked for DLQ :  {} ", envelope);
    }

    private byte[] dispatch(byte[] bArr) {
        try {
            return this.codec.serialize(new MessagePayload(this.handler.apply(((MessagePayload) this.codec.deserialize(bArr, MessagePayload.class)).getPayload())), MessagePayload.class);
        } catch (Exception e) {
            log.error("Unable to apply reply handler", e);
            throw new HoplinRuntimeException("Unable to apply reply handler", e);
        }
    }
}
