package org.objectweb.proactive.extensions.amqp.remoteobject;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.objectweb.proactive.core.ProActiveException;
import org.objectweb.proactive.core.body.request.Request;
import org.objectweb.proactive.core.remoteobject.InternalRemoteRemoteObject;
import org.objectweb.proactive.core.util.converter.ByteToObjectConverter;
import org.objectweb.proactive.core.util.converter.ObjectToByteConverter;
import org.objectweb.proactive.core.util.log.ProActiveLogger;
import org.objectweb.proactive.extensions.amqp.AMQPConfig;
import org.objectweb.proactive.utils.NamedThreadFactory;
import org.objectweb.proactive.utils.ThreadPools;

/* loaded from: input_file:org/objectweb/proactive/extensions/amqp/remoteobject/AbstractAMQPRemoteObjectServer.class */
public abstract class AbstractAMQPRemoteObjectServer {
    protected final InternalRemoteRemoteObject rro;
    private static final Logger logger = ProActiveLogger.getLogger(AMQPConfig.Loggers.AMQP_REMOTE_OBJECT);
    static final ThreadPoolExecutor tpe = ThreadPools.newCachedThreadPool(5, TimeUnit.MINUTES, new NamedThreadFactory("AMQP Consumer Thread ", true));

    /* loaded from: input_file:org/objectweb/proactive/extensions/amqp/remoteobject/AbstractAMQPRemoteObjectServer$Consumer.class */
    final class Consumer extends DefaultConsumer {
        private final ReusableChannel reusableChannel;

        public Consumer(ReusableChannel reusableChannel) {
            super(reusableChannel.getChannel());
            this.reusableChannel = reusableChannel;
        }

        public void handleCancel(String str) throws IOException {
            this.reusableChannel.returnChannel();
        }

        public void handleDelivery(String str, Envelope envelope, final AMQP.BasicProperties basicProperties, final byte[] bArr) throws IOException {
            AbstractAMQPRemoteObjectServer.tpe.execute(new Runnable() { // from class: org.objectweb.proactive.extensions.amqp.remoteobject.AbstractAMQPRemoteObjectServer.Consumer.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        String type = basicProperties.getType();
                        byte[] handleMethodCall = type == null ? AbstractAMQPRemoteObjectServer.this.handleMethodCall(bArr) : AbstractFindQueuesRPCClient.DISCOVERY_QUEUES_MESSAGE_TYPE.equals(type) ? AbstractAMQPRemoteObjectServer.this.handleDiscoverQueueMessage() : AbstractAMQPRemoteObjectServer.this.handleMessage(Consumer.this.getChannel(), basicProperties, bArr);
                        if (handleMethodCall != null) {
                            try {
                                Consumer.this.getChannel().basicPublish(AbstractAMQPRemoteObjectServer.this.getReplyExchange(), basicProperties.getReplyTo(), (AMQP.BasicProperties) null, handleMethodCall);
                            } catch (IOException e) {
                                AbstractAMQPRemoteObjectServer.logger.error("Failed to send message", e);
                            }
                        }
                    } catch (Exception e2) {
                        AbstractAMQPRemoteObjectServer.logger.error("Error during message processing", e2);
                    }
                }
            });
        }
    }

    public AbstractAMQPRemoteObjectServer(InternalRemoteRemoteObject internalRemoteRemoteObject) {
        this.rro = internalRemoteRemoteObject;
    }

    protected abstract ReusableChannel getReusableChannel() throws ProActiveException, IOException;

    protected abstract void createObjectQueue(Channel channel, String str) throws IOException;

    protected abstract byte[] handleMessage(Channel channel, AMQP.BasicProperties basicProperties, byte[] bArr) throws Exception;

    protected abstract String getReplyExchange();

    /* JADX INFO: Access modifiers changed from: private */
    public byte[] handleMethodCall(byte[] bArr) throws Exception {
        return ObjectToByteConverter.ProActiveObjectStream.convert(this.rro.receiveMessage((Request) ByteToObjectConverter.ProActiveObjectStream.convert(bArr)));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public byte[] handleDiscoverQueueMessage() throws Exception {
        return this.rro.getURI().toString().getBytes();
    }

    public final void connect(boolean z) throws IOException, ProActiveException {
        String computeQueueNameFromURI = AMQPUtils.computeQueueNameFromURI(this.rro.getURI());
        ReusableChannel reusableChannel = getReusableChannel();
        boolean z2 = false;
        try {
            Channel channel = reusableChannel.getChannel();
            createObjectQueue(channel, computeQueueNameFromURI);
            z2 = true;
            if (logger.isDebugEnabled()) {
                logger.debug(String.format("declared queue %s", computeQueueNameFromURI));
            }
            channel.basicConsume(computeQueueNameFromURI, true, new Consumer(reusableChannel));
        } catch (IOException e) {
            if (z2) {
                try {
                    reusableChannel.getChannel().queueDelete(computeQueueNameFromURI);
                } catch (Exception e2) {
                    logger.warn("Failed to delete queue", e2);
                }
            }
            reusableChannel.close();
            throw e;
        }
    }
}
