package edu.stanford.protege.webprotege.ipc.pulsar;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.stanford.protege.webprotege.common.ProjectRequest;
import edu.stanford.protege.webprotege.common.Request;
import edu.stanford.protege.webprotege.common.Response;
import edu.stanford.protege.webprotege.ipc.CommandExecutionException;
import edu.stanford.protege.webprotege.ipc.CommandExecutor;
import edu.stanford.protege.webprotege.ipc.ExecutionContext;
import edu.stanford.protege.webprotege.ipc.Headers;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.invoke.SerializedLambda;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PreDestroy;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

/* loaded from: input_file:edu/stanford/protege/webprotege/ipc/pulsar/PulsarCommandExecutor.class */
public class PulsarCommandExecutor<Q extends Request<R>, R extends Response> implements CommandExecutor<Q, R> {
    private static final Logger logger = LoggerFactory.getLogger(PulsarCommandExecutor.class);
    private final Class<R> responseClass;

    @Value("${spring.application.name}")
    private String applicationName;

    @Autowired
    private PulsarClient pulsarClient;

    @Autowired
    private PulsarAdmin pulsarAdmin;

    @Autowired
    private ObjectMapper objectMapper;
    private Producer<byte[]> producer;
    private Consumer<byte[]> consumer;
    private String requestChannel = null;
    private String replyChannel = null;
    private final Map<String, CompletableFuture<R>> replyHandlers = new ConcurrentHashMap();

    @Value("${webprotege.pulsar.tenant}")
    private String tenant;
    private String replySubscriptionName;

    public PulsarCommandExecutor(Class<R> cls) {
        this.responseClass = cls;
    }

    @Override // edu.stanford.protege.webprotege.ipc.CommandExecutor
    public CompletableFuture<R> execute(Q q, ExecutionContext executionContext) {
        try {
            String replyChannelName = getReplyChannelName(q);
            byte[] writeValueAsBytes = this.objectMapper.writeValueAsBytes(q);
            try {
                Producer<byte[]> producer = getProducer(q);
                String uuid = UUID.randomUUID().toString();
                CompletableFuture<R> completableFuture = new CompletableFuture<>();
                this.replyHandlers.put(uuid, completableFuture);
                TypedMessageBuilder property = producer.newMessage().value(writeValueAsBytes).property(Headers.CORRELATION_ID, uuid).property(Headers.REPLY_CHANNEL, replyChannelName).property(Headers.USER_ID, executionContext.userId().value());
                if (q instanceof ProjectRequest) {
                    String id = ((ProjectRequest) q).projectId().id();
                    property.property(Headers.PROJECT_ID, id);
                    property.key(id);
                }
                property.send();
                return completableFuture;
            } catch (PulsarClientException e) {
                e.printStackTrace();
                return new CompletableFuture<>();
            }
        } catch (JsonProcessingException e2) {
            logger.error("JSON Processing Exception");
            throw new UncheckedIOException(e2);
        }
    }

    private synchronized Producer<byte[]> getProducer(Q q) {
        try {
            if (this.producer != null) {
                return this.producer;
            }
            ensureConsumerIsListeningForRepliesToRequest(q);
            if (this.requestChannel == null) {
                this.requestChannel = q.getChannel();
            }
            if (!this.requestChannel.equals(q.getChannel())) {
                throw new RuntimeException("Request channel is not the request channel that is in use by this CommandExecutor");
            }
            Producer<byte[]> create = this.pulsarClient.newProducer().producerName(this.applicationName + "--CommandExecutor--" + q.getChannel()).topic("persistent://" + this.tenant + "/command-requests/" + this.requestChannel).accessMode(ProducerAccessMode.Shared).create();
            this.producer = create;
            return create;
        } catch (PulsarClientException e) {
            throw new UncheckedIOException(e);
        }
    }

    private void ensureConsumerIsListeningForRepliesToRequest(Q q) {
        try {
            if (this.consumer != null) {
                return;
            }
            if (this.replyChannel == null) {
                this.replyChannel = getReplyChannelName(q);
            }
            if (!this.replyChannel.equals(getReplyChannelName(q))) {
                throw new RuntimeException("Reply channel is not the channel that is in use by this command executor");
            }
            String str = "persistent://" + this.tenant + "/command-responses/" + this.replyChannel;
            this.replySubscriptionName = this.applicationName + "--" + this.replyChannel + "--" + String.valueOf(UUID.randomUUID());
            logger.info("Setting up consumer with subscription {} to listen for replies at {}", this.replySubscriptionName, str);
            this.consumer = this.pulsarClient.newConsumer().subscriptionName(this.replySubscriptionName).subscriptionType(SubscriptionType.Exclusive).topic(new String[]{str}).messageListener(this::handleReplyMessageReceived).subscribe();
        } catch (PulsarClientException e) {
            throw new UncheckedIOException(e);
        }
    }

    private void handleReplyMessageReceived(Consumer<byte[]> consumer, Message<byte[]> message) {
        try {
            String property = message.getProperty(Headers.CORRELATION_ID);
            if (property == null) {
                logger.info("CorrelationId in reply message is missing.  Cannot handle reply.  Ignoring reply.");
                return;
            }
            String property2 = message.getProperty(Headers.ERROR);
            if (property2 != null) {
                this.replyHandlers.remove(property).completeExceptionally((CommandExecutionException) this.objectMapper.readValue(property2, CommandExecutionException.class));
                consumer.acknowledge(message);
            } else {
                CompletableFuture remove = this.replyHandlers.remove(property);
                Response response = (Response) this.objectMapper.readValue(message.getData(), this.responseClass);
                consumer.acknowledge(message);
                remove.complete(response);
            }
        } catch (IOException e) {
            logger.error("Cannot deserialize reply message on topic {}", consumer.getTopic(), e);
            consumer.negativeAcknowledge(message);
        } catch (PulsarClientException e2) {
            logger.error("Encountered Pulsar Client Exception", e2);
            throw new UncheckedIOException(e2);
        }
    }

    @PreDestroy
    public void close() {
        if (this.consumer != null) {
            logger.info("Closing consumer listening to {}", this.consumer.getConsumerName());
            this.consumer.unsubscribeAsync();
            this.consumer.closeAsync();
        }
        if (this.producer != null) {
            logger.info("Closing producer {}", this.producer.getProducerName());
            this.producer.closeAsync();
        }
    }

    private String getReplyChannelName(Q q) {
        return q.getChannel() + "--replies";
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1830385402:
                if (implMethodName.equals("handleReplyMessageReceived")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("edu/stanford/protege/webprotege/ipc/pulsar/PulsarCommandExecutor") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    PulsarCommandExecutor pulsarCommandExecutor = (PulsarCommandExecutor) serializedLambda.getCapturedArg(0);
                    return pulsarCommandExecutor::handleReplyMessageReceived;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
