package org.gecko.adapter.amqp.consumer;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.gecko.osgi.messaging.Message;
import org.gecko.util.pushstream.SimplePushEventSourceContext;
import org.osgi.util.function.Predicate;

/* loaded from: input_file:org/gecko/adapter/amqp/consumer/AMQPReplyToConsumer.class */
public class AMQPReplyToConsumer extends AMQPAcknowledgingConsumer {
    private static final Logger logger = Logger.getLogger(AMQPReplyToConsumer.class.getName());
    private final String correlationId;

    public AMQPReplyToConsumer(Channel channel, String str, Predicate<Message> predicate, SimplePushEventSourceContext<Message> simplePushEventSourceContext, String str2) {
        super(channel, str, predicate, simplePushEventSourceContext);
        this.correlationId = str2;
    }

    @Override // org.gecko.adapter.amqp.consumer.AMQPAcknowledgingConsumer
    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        String routingKey = envelope.getRoutingKey();
        String exchange = envelope.getExchange();
        long deliveryTag = envelope.getDeliveryTag();
        String contentType = basicProperties.getContentType();
        String correlationId = basicProperties.getCorrelationId();
        String replyTo = basicProperties.getReplyTo();
        AMQPMessageImpl aMQPMessageImpl = new AMQPMessageImpl(this.topic, ByteBuffer.wrap(bArr));
        aMQPMessageImpl.setDeliveryTag(deliveryTag);
        aMQPMessageImpl.setExchange(exchange);
        aMQPMessageImpl.setRoutingKey(routingKey);
        aMQPMessageImpl.setContentType(contentType);
        aMQPMessageImpl.setReplyTo(replyTo);
        aMQPMessageImpl.setCorrelationId(correlationId);
        if (correlationId == null || !correlationId.equals(this.correlationId)) {
            logger.log(Level.SEVERE, "This message does not fit to the correlation id, given by the request");
            return;
        }
        if (!this.eventSource.isConnected()) {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            this.eventSource.connectPromise().thenAccept(r3 -> {
                countDownLatch.countDown();
            });
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                logger.log(Level.SEVERE, "Waiting for a event source connection was interrupted", (Throwable) e);
            }
        }
        if (!this.eventSource.isConnected()) {
            logger.severe("Event source is not connected");
            return;
        }
        try {
            this.eventSource.publish(aMQPMessageImpl);
            logger.log(Level.INFO, "Received message: '" + new String(bArr) + "' with routingKey: " + routingKey + ", contentType: " + contentType + ", deliveryTag: " + deliveryTag);
            logger.log(Level.INFO, "publish to event source for correlation: '" + correlationId);
            if (this.mbean != null) {
                this.mbean.setLastMessageTime(new Date());
            }
        } catch (Exception e2) {
            logger.log(Level.SEVERE, "Detected error on AMQP receive", (Throwable) e2);
        }
    }

    @Override // org.gecko.adapter.amqp.consumer.AMQPAcknowledgingConsumer
    public void close() {
        logger.log(Level.INFO, "clode event source");
        super.close();
    }
}
