package org.objectweb.proactive.extensions.amqp.remoteobject;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.objectweb.proactive.core.ProActiveException;
import org.objectweb.proactive.core.body.future.MethodCallResult;
import org.objectweb.proactive.core.body.request.Request;
import org.objectweb.proactive.core.process.JVMProcessImpl;
import org.objectweb.proactive.core.remoteobject.InternalRemoteRemoteObject;
import org.objectweb.proactive.core.remoteobject.SynchronousReplyImpl;
import org.objectweb.proactive.core.util.URIBuilder;
import org.objectweb.proactive.core.util.converter.ByteToObjectConverter;
import org.objectweb.proactive.core.util.converter.ObjectToByteConverter;
import org.objectweb.proactive.core.util.log.ProActiveLogger;
import org.objectweb.proactive.extensions.amqp.AMQPConfig;
import org.objectweb.proactive.utils.NamedThreadFactory;
import org.objectweb.proactive.utils.ThreadPools;

/* loaded from: input_file:org/objectweb/proactive/extensions/amqp/remoteobject/AMQPRemoteObjectServer.class */
public class AMQPRemoteObjectServer {
    private final InternalRemoteRemoteObject rro;
    private final String queueName;
    private static final Logger logger = ProActiveLogger.getLogger(AMQPConfig.Loggers.AMQP_REMOTE_OBJECT);
    private static final String QUEUES_MESSAGE_TYPE = AMQPConfig.PA_AMQP_DISCOVERY_QUEUES_MESSAGE_TYPE.getValue();
    static final ThreadPoolExecutor tpe = ThreadPools.newCachedThreadPool(5, TimeUnit.MINUTES, new NamedThreadFactory("AMQP Consumer Thread ", true));

    public AMQPRemoteObjectServer(InternalRemoteRemoteObject internalRemoteRemoteObject) throws ProActiveException, IOException {
        this.rro = internalRemoteRemoteObject;
        this.queueName = AMQPUtils.computeQueueNameFromName(URIBuilder.getNameFromURI(internalRemoteRemoteObject.getURI()));
    }

    public void connect(boolean z) throws IOException, ProActiveException {
        final ReusableChannel channel = AMQPUtils.getChannel(this.rro.getURI());
        boolean z2 = false;
        try {
            Channel channel2 = channel.getChannel();
            AMQP.Queue.DeclareOk queueDeclare = channel2.queueDeclare(this.queueName, false, false, true, (Map) null);
            z2 = true;
            if (logger.isDebugEnabled()) {
                logger.debug(String.format("declared queue %s,response %s", this.queueName, queueDeclare.toString()));
            }
            channel2.queueBind(this.queueName, AMQPConfig.PA_AMQP_DISCOVER_EXCHANGE_NAME.getValue(), JVMProcessImpl.DEFAULT_JVMPARAMETERS);
            channel2.queueBind(this.queueName, AMQPConfig.PA_AMQP_RPC_EXCHANGE_NAME.getValue(), this.queueName);
            channel2.basicConsume(this.queueName, true, new DefaultConsumer(channel2) { // from class: org.objectweb.proactive.extensions.amqp.remoteobject.AMQPRemoteObjectServer.1
                public void handleCancel(String str) throws IOException {
                    AMQPUtils.returnChannel(channel);
                }

                public void handleDelivery(String str, Envelope envelope, final AMQP.BasicProperties basicProperties, final byte[] bArr) throws IOException {
                    AMQPRemoteObjectServer.tpe.execute(new Runnable() { // from class: org.objectweb.proactive.extensions.amqp.remoteobject.AMQPRemoteObjectServer.1.1
                        @Override // java.lang.Runnable
                        public void run() {
                            byte[] convert;
                            byte[] bArr2 = (byte[]) null;
                            AMQP.BasicProperties basicProperties2 = null;
                            try {
                                try {
                                    if (AMQPRemoteObjectServer.QUEUES_MESSAGE_TYPE.equals(basicProperties.getType())) {
                                        basicProperties2 = new AMQP.BasicProperties.Builder().correlationId(basicProperties.getCorrelationId()).build();
                                        convert = AMQPRemoteObjectServer.this.rro.getURI().toString().getBytes();
                                    } else {
                                        Request request = (Request) ByteToObjectConverter.ProActiveObjectStream.convert(bArr);
                                        if (AMQPRemoteObjectServer.logger.isDebugEnabled()) {
                                            AMQPRemoteObjectServer.logger.debug(String.format("message %s consumed on queue %s", request.getMethodName(), AMQPRemoteObjectServer.this.queueName));
                                        }
                                        convert = ObjectToByteConverter.ProActiveObjectStream.convert(AMQPRemoteObjectServer.this.rro.receiveMessage(request));
                                    }
                                    try {
                                        getChannel().basicPublish(JVMProcessImpl.DEFAULT_JVMPARAMETERS, basicProperties.getReplyTo(), basicProperties2, convert);
                                    } catch (IOException e) {
                                        AMQPRemoteObjectServer.logger.error("Failed to send message", e);
                                    }
                                } catch (Throwable th) {
                                    try {
                                        getChannel().basicPublish(JVMProcessImpl.DEFAULT_JVMPARAMETERS, basicProperties.getReplyTo(), (AMQP.BasicProperties) null, bArr2);
                                    } catch (IOException e2) {
                                        AMQPRemoteObjectServer.logger.error("Failed to send message", e2);
                                    }
                                    throw th;
                                }
                            } catch (Exception e3) {
                                AMQPRemoteObjectServer.logger.error("Error during reply processing", e3);
                                try {
                                    bArr2 = ObjectToByteConverter.ProActiveObjectStream.convert(new SynchronousReplyImpl(new MethodCallResult(null, e3)));
                                } catch (IOException e4) {
                                    AMQPRemoteObjectServer.logger.error("Failed to convert reply", e4);
                                }
                                try {
                                    getChannel().basicPublish(JVMProcessImpl.DEFAULT_JVMPARAMETERS, basicProperties.getReplyTo(), (AMQP.BasicProperties) null, bArr2);
                                } catch (IOException e5) {
                                    AMQPRemoteObjectServer.logger.error("Failed to send message", e5);
                                }
                            }
                        }
                    });
                }
            });
        } catch (IOException e) {
            if (z2) {
                try {
                    channel.getChannel().queueDelete(this.queueName);
                } catch (Exception e2) {
                    logger.warn("Failed to delete queue", e2);
                }
            }
            channel.close();
            throw e;
        }
    }
}
