package com.jaffa.rpc.lib.rabbitmq.receivers;

import com.jaffa.rpc.lib.JaffaService;
import com.jaffa.rpc.lib.common.RequestInvocationHelper;
import com.jaffa.rpc.lib.entities.Command;
import com.jaffa.rpc.lib.exception.JaffaRpcSystemException;
import com.jaffa.rpc.lib.rabbitmq.RabbitMQRequestSender;
import com.jaffa.rpc.lib.serialization.Serializer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.Connection;

/* loaded from: input_file:com/jaffa/rpc/lib/rabbitmq/receivers/RabbitMQAsyncAndSyncRequestReceiver.class */
public class RabbitMQAsyncAndSyncRequestReceiver implements Runnable, Closeable {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQAsyncAndSyncRequestReceiver.class);
    private static final ExecutorService responseService = Executors.newFixedThreadPool(3);
    private static final ExecutorService requestService = Executors.newFixedThreadPool(3);
    private static final Map<String, Object> asyncHeaders = new HashMap();
    private Connection connection;
    private Channel serverChannel;
    private Channel clientChannel;

    @Override // java.lang.Runnable
    public void run() {
        try {
            this.connection = JaffaService.getConnectionFactory().createConnection();
            this.serverChannel = this.connection.createChannel(false);
            this.clientChannel = this.connection.createChannel(false);
            this.serverChannel.queueBind(RabbitMQRequestSender.SERVER, RabbitMQRequestSender.EXCHANGE_NAME, RabbitMQRequestSender.SERVER);
            this.serverChannel.basicConsume(RabbitMQRequestSender.SERVER, false, new DefaultConsumer(this.serverChannel) { // from class: com.jaffa.rpc.lib.rabbitmq.receivers.RabbitMQAsyncAndSyncRequestReceiver.1
                public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
                    RabbitMQAsyncAndSyncRequestReceiver.requestService.execute(() -> {
                        try {
                            Command command = (Command) Serializer.getCurrent().deserialize(bArr, Command.class);
                            if (Objects.nonNull(command.getCallbackKey()) && Objects.nonNull(command.getCallbackClass())) {
                                RabbitMQAsyncAndSyncRequestReceiver.responseService.execute(() -> {
                                    try {
                                        RabbitMQAsyncAndSyncRequestReceiver.this.clientChannel.basicPublish(command.getSourceModuleId(), command.getSourceModuleId() + "-client-async", new AMQP.BasicProperties.Builder().headers(RabbitMQAsyncAndSyncRequestReceiver.asyncHeaders).build(), Serializer.getCurrent().serialize(RequestInvocationHelper.constructCallbackContainer(command, RequestInvocationHelper.invoke(command))));
                                        RabbitMQAsyncAndSyncRequestReceiver.this.serverChannel.basicAck(envelope.getDeliveryTag(), false);
                                    } catch (Exception e) {
                                        RabbitMQAsyncAndSyncRequestReceiver.log.error("Error while receiving async request", e);
                                    }
                                });
                            } else {
                                RabbitMQAsyncAndSyncRequestReceiver.this.clientChannel.basicPublish(command.getSourceModuleId(), command.getSourceModuleId() + "-client-sync", new AMQP.BasicProperties.Builder().correlationId(command.getRqUid()).build(), Serializer.getCurrent().serializeWithClass(RequestInvocationHelper.getResult(RequestInvocationHelper.invoke(command))));
                                RabbitMQAsyncAndSyncRequestReceiver.this.serverChannel.basicAck(envelope.getDeliveryTag(), false);
                            }
                        } catch (Exception e) {
                            RabbitMQAsyncAndSyncRequestReceiver.log.error("Error while receiving sync request", e);
                        }
                    });
                }
            });
            log.info("{} terminated", getClass().getSimpleName());
        } catch (Exception e) {
            log.error("Error during RabbitMQ request receiver startup:", e);
            throw new JaffaRpcSystemException(e);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            this.serverChannel.close();
            this.clientChannel.close();
        } catch (IOException | TimeoutException e) {
        }
        this.connection.close();
        responseService.shutdownNow();
        requestService.shutdownNow();
    }

    static {
        asyncHeaders.put("communication-type", "async");
    }
}
