package com.jaffa.rpc.lib.rabbitmq;

import com.jaffa.rpc.lib.JaffaService;
import com.jaffa.rpc.lib.common.OptionConstants;
import com.jaffa.rpc.lib.entities.Protocol;
import com.jaffa.rpc.lib.exception.JaffaRpcExecutionException;
import com.jaffa.rpc.lib.exception.JaffaRpcNoRouteException;
import com.jaffa.rpc.lib.exception.JaffaRpcSystemException;
import com.jaffa.rpc.lib.request.Sender;
import com.jaffa.rpc.lib.zookeeper.Utils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.connection.Connection;

/* loaded from: input_file:com/jaffa/rpc/lib/rabbitmq/RabbitMQRequestSender.class */
public class RabbitMQRequestSender extends Sender {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQRequestSender.class);
    private static final String NAME_PREFIX = Utils.getRequiredOption(OptionConstants.MODULE_ID);
    public static final String EXCHANGE_NAME = NAME_PREFIX;
    public static final String CLIENT_SYNC_NAME = NAME_PREFIX + "-client-sync";
    public static final String CLIENT_ASYNC_NAME = NAME_PREFIX + "-client-async";
    public static final String SERVER = NAME_PREFIX + "-server";
    private static final Map<String, Callback> requests = new ConcurrentHashMap();
    private static Connection connection;
    private static Channel clientChannel;

    /* loaded from: input_file:com/jaffa/rpc/lib/rabbitmq/RabbitMQRequestSender$Callback.class */
    private interface Callback {
        void call(byte[] bArr);
    }

    public static void init() {
        try {
            connection = JaffaService.getConnectionFactory().createConnection();
            clientChannel = connection.createChannel(false);
            clientChannel.queueBind(CLIENT_SYNC_NAME, EXCHANGE_NAME, CLIENT_SYNC_NAME);
            clientChannel.basicConsume(CLIENT_SYNC_NAME, false, new DefaultConsumer(clientChannel) { // from class: com.jaffa.rpc.lib.rabbitmq.RabbitMQRequestSender.1
                public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    if (Objects.nonNull(basicProperties) && Objects.nonNull(basicProperties.getCorrelationId())) {
                        Callback callback = (Callback) RabbitMQRequestSender.requests.remove(basicProperties.getCorrelationId());
                        if (Objects.nonNull(callback)) {
                            callback.call(bArr);
                            RabbitMQRequestSender.clientChannel.basicAck(envelope.getDeliveryTag(), false);
                        }
                    }
                }
            });
        } catch (AmqpException | IOException e) {
            log.error("Error during RabbitMQ response receiver startup:", e);
            throw new JaffaRpcSystemException((Throwable) e);
        }
    }

    public static void close() {
        try {
            if (Objects.nonNull(clientChannel)) {
                clientChannel.close();
            }
        } catch (IOException | TimeoutException e) {
        }
        if (Objects.nonNull(connection)) {
            connection.close();
        }
    }

    @Override // com.jaffa.rpc.lib.request.Sender
    public byte[] executeSync(byte[] bArr) {
        byte[] bArr2;
        try {
            AtomicReference atomicReference = new AtomicReference();
            Map<String, Callback> map = requests;
            String rqUid = this.command.getRqUid();
            atomicReference.getClass();
            map.put(rqUid, (v1) -> {
                r2.set(v1);
            });
            sendSync(bArr);
            long currentTimeMillis = System.currentTimeMillis();
            do {
                if ((this.timeout != -1 && System.currentTimeMillis() - currentTimeMillis > this.timeout) || System.currentTimeMillis() - currentTimeMillis > 3600000) {
                    requests.remove(this.command.getRqUid());
                    return null;
                }
                bArr2 = (byte[]) atomicReference.get();
            } while (!Objects.nonNull(bArr2));
            return bArr2;
        } catch (JaffaRpcNoRouteException e) {
            throw e;
        } catch (Exception e2) {
            log.error("Error while sending sync RabbitMQ request", e2);
            throw new JaffaRpcExecutionException(e2);
        }
    }

    private void sendSync(byte[] bArr) throws IOException {
        String moduleForService;
        if (StringUtils.isNotBlank(this.moduleId)) {
            moduleForService = this.moduleId;
            Utils.getHostForService(Utils.getServiceInterfaceNameFromClient(this.command.getServiceClass()), this.moduleId, Protocol.RABBIT);
        } else {
            moduleForService = Utils.getModuleForService(Utils.getServiceInterfaceNameFromClient(this.command.getServiceClass()), Protocol.RABBIT);
        }
        clientChannel.basicPublish(moduleForService, moduleForService + "-server", (AMQP.BasicProperties) null, bArr);
    }

    @Override // com.jaffa.rpc.lib.request.Sender
    public void executeAsync(byte[] bArr) {
        try {
            sendSync(bArr);
        } catch (JaffaRpcNoRouteException e) {
            throw e;
        } catch (Exception e2) {
            log.error("Error while sending async RabbitMQ request", e2);
            throw new JaffaRpcExecutionException(e2);
        }
    }
}
