package io.hoplin.rpc;

import com.rabbitmq.client.Channel;
import io.hoplin.Binding;
import io.hoplin.HoplinRuntimeException;
import io.hoplin.RabbitMQClient;
import io.hoplin.RabbitMQOptions;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hoplin/rpc/DefaultRpcServer.class */
public class DefaultRpcServer<I, O> implements RpcServer<I, O> {
    private static final Logger log = LoggerFactory.getLogger(DefaultRpcServer.class);
    private final RabbitMQClient client;
    private final Channel channel;
    private final String exchange;
    private String routingKey;
    private String requestQueueName;
    private Executor executor;

    public DefaultRpcServer(RabbitMQOptions rabbitMQOptions, Binding binding) {
        Objects.requireNonNull(rabbitMQOptions);
        Objects.requireNonNull(binding);
        this.client = RabbitMQClient.create(rabbitMQOptions);
        this.channel = this.client.channel();
        this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        this.exchange = binding.getExchange();
        this.routingKey = binding.getRoutingKey();
        this.requestQueueName = binding.getQueue();
        if (this.routingKey == null) {
            this.routingKey = "";
        }
        bind();
    }

    private void bind() {
        log.info("Param RoutingKey  : {}", this.routingKey);
        log.info("Param Exchange    : {}", this.exchange);
        log.info("Param Request     : {}", this.requestQueueName);
        try {
            this.channel.exchangeDeclare(this.exchange, "direct", false, true, (Map) null);
            this.channel.queueDeclare(this.requestQueueName, false, false, true, (Map) null);
        } catch (Exception e) {
            throw new HoplinRuntimeException("Unable to bind queue", e);
        }
    }

    private void consumeRequest(Function<I, O> function) {
        try {
            log.info("consumeRequest requestQueueName : {}, {}", this.requestQueueName, this.channel.queueBind(this.requestQueueName, this.exchange, this.routingKey));
            this.channel.basicQos(1);
            this.channel.basicConsume(this.requestQueueName, false, new RpcResponderConsumer(this.channel, function, this.executor));
        } catch (Exception e) {
            throw new HoplinRuntimeException("Unable to start RPC server consumer", e);
        }
    }

    @Override // io.hoplin.rpc.RpcServer
    public void respondAsync(Function<I, O> function) {
        consumeRequest(function);
    }

    public static RpcServer create(RabbitMQOptions rabbitMQOptions, Binding binding) {
        Objects.requireNonNull(rabbitMQOptions);
        Objects.requireNonNull(binding);
        return new DefaultRpcServer(rabbitMQOptions, binding);
    }

    public void close() throws IOException {
        if (this.client != null) {
            this.client.disconnect();
        }
    }
}
