package com.github.ledio5485.pulsar.consumer;

import com.github.ledio5485.pulsar.PulsarMessage;
import com.github.ledio5485.pulsar.collector.ConsumerCollector;
import com.github.ledio5485.pulsar.collector.ConsumerHolder;
import com.github.ledio5485.pulsar.error.FailedMessage;
import com.github.ledio5485.pulsar.error.exception.ConsumerInitException;
import com.github.ledio5485.pulsar.properties.ConsumerProperties;
import com.github.ledio5485.pulsar.utils.HandlingConsumer;
import com.github.ledio5485.pulsar.utils.SchemaUtils;
import com.github.ledio5485.pulsar.utils.TopicUrlService;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.context.EmbeddedValueResolverAware;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import org.springframework.util.StringValueResolver;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;

@DependsOn({"pulsarClient", "consumerCollector"})
@Component
/* loaded from: input_file:com/github/ledio5485/pulsar/consumer/ConsumerAggregator.class */
public class ConsumerAggregator implements EmbeddedValueResolverAware {
    private final EmitterProcessor<FailedMessage> exceptionEmitter = EmitterProcessor.create();
    private final ConsumerCollector consumerCollector;
    private final PulsarClient pulsarClient;
    private final ConsumerProperties consumerProperties;
    private final TopicUrlService topicUrlService;
    private StringValueResolver stringValueResolver;
    private List<Consumer> consumers;

    public ConsumerAggregator(ConsumerCollector consumerCollector, PulsarClient pulsarClient, ConsumerProperties consumerProperties, TopicUrlService topicUrlService) {
        this.consumerCollector = consumerCollector;
        this.pulsarClient = pulsarClient;
        this.consumerProperties = consumerProperties;
        this.topicUrlService = topicUrlService;
    }

    @PostConstruct
    private void init() {
        this.consumers = (List) this.consumerCollector.getConsumers().entrySet().stream().map(entry -> {
            return subscribe((String) entry.getKey(), (ConsumerHolder) entry.getValue());
        }).collect(Collectors.toList());
    }

    @PreDestroy
    private void clear() {
        this.consumers.forEach(HandlingConsumer.handlingConsumerBuilder((v0) -> {
            v0.unsubscribe();
        }));
    }

    private Consumer<?> subscribe(String str, ConsumerHolder consumerHolder) {
        try {
            ConsumerBuilder<?> messageListener = this.pulsarClient.newConsumer(SchemaUtils.getSchema(consumerHolder.getAnnotation().serialization(), consumerHolder.getAnnotation().clazz())).consumerName(String.format("consumer-%s-%s", UUID.randomUUID(), str)).subscriptionName(String.format("subscription-%s-%s", UUID.randomUUID(), str)).topic(new String[]{this.topicUrlService.buildTopicUrl((String) Objects.requireNonNull(this.stringValueResolver.resolveStringValue(consumerHolder.getAnnotation().topic())))}).subscriptionType(consumerHolder.getAnnotation().subscriptionType()).messageListener((consumer, message) -> {
                try {
                    Method handler = consumerHolder.getHandler();
                    handler.setAccessible(true);
                    if (consumerHolder.isWrapped()) {
                        handler.invoke(consumerHolder.getBean(), wrapMessage(message));
                    } else {
                        handler.invoke(consumerHolder.getBean(), message.getValue());
                    }
                    consumer.acknowledge(message);
                } catch (Exception e) {
                    consumer.negativeAcknowledge(message);
                    this.exceptionEmitter.onNext(new FailedMessage(e, consumer, message));
                }
            });
            if (this.consumerProperties.getAckTimeoutMs() > 0) {
                messageListener.ackTimeout(this.consumerProperties.getAckTimeoutMs(), TimeUnit.MILLISECONDS);
            }
            buildDeadLetterPolicy(consumerHolder, messageListener);
            return messageListener.subscribe();
        } catch (PulsarClientException e) {
            throw new ConsumerInitException("Failed to init consumer.", e);
        }
    }

    public void buildDeadLetterPolicy(ConsumerHolder consumerHolder, ConsumerBuilder<?> consumerBuilder) {
        DeadLetterPolicy.DeadLetterPolicyBuilder deadLetterPolicyBuilder = null;
        if (this.consumerProperties.getDeadLetterPolicyMaxRedeliverCount() >= 0) {
            deadLetterPolicyBuilder = DeadLetterPolicy.builder().maxRedeliverCount(this.consumerProperties.getDeadLetterPolicyMaxRedeliverCount());
        }
        if (consumerHolder.getAnnotation().maxRedeliverCount() >= 0) {
            deadLetterPolicyBuilder = DeadLetterPolicy.builder().maxRedeliverCount(consumerHolder.getAnnotation().maxRedeliverCount());
        }
        if (deadLetterPolicyBuilder != null && !consumerHolder.getAnnotation().deadLetterTopic().isEmpty()) {
            deadLetterPolicyBuilder.deadLetterTopic(this.topicUrlService.buildTopicUrl(consumerHolder.getAnnotation().deadLetterTopic()));
        }
        if (deadLetterPolicyBuilder != null) {
            consumerBuilder.deadLetterPolicy(deadLetterPolicyBuilder.build());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T> PulsarMessage<T> wrapMessage(Message<T> message) {
        PulsarMessage<T> pulsarMessage = (PulsarMessage<T>) new PulsarMessage();
        pulsarMessage.setValue(message.getValue());
        pulsarMessage.setMessageId(message.getMessageId());
        pulsarMessage.setSequenceId(message.getSequenceId());
        pulsarMessage.setProperties(message.getProperties());
        pulsarMessage.setTopicName(message.getTopicName());
        pulsarMessage.setKey(message.getKey());
        pulsarMessage.setEventTime(message.getEventTime());
        pulsarMessage.setPublishTime(message.getPublishTime());
        pulsarMessage.setProducerName(message.getProducerName());
        return pulsarMessage;
    }

    public List<Consumer> getConsumers() {
        return this.consumers;
    }

    public Disposable onError(java.util.function.Consumer<? super FailedMessage> consumer) {
        return this.exceptionEmitter.subscribe(consumer);
    }

    public void setEmbeddedValueResolver(StringValueResolver stringValueResolver) {
        this.stringValueResolver = stringValueResolver;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -756681999:
                if (implMethodName.equals("lambda$subscribe$5421a088$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("com/github/ledio5485/pulsar/consumer/ConsumerAggregator") && serializedLambda.getImplMethodSignature().equals("(Lcom/github/ledio5485/pulsar/collector/ConsumerHolder;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    ConsumerAggregator consumerAggregator = (ConsumerAggregator) serializedLambda.getCapturedArg(0);
                    ConsumerHolder consumerHolder = (ConsumerHolder) serializedLambda.getCapturedArg(1);
                    return (consumer, message) -> {
                        try {
                            Method handler = consumerHolder.getHandler();
                            handler.setAccessible(true);
                            if (consumerHolder.isWrapped()) {
                                handler.invoke(consumerHolder.getBean(), wrapMessage(message));
                            } else {
                                handler.invoke(consumerHolder.getBean(), message.getValue());
                            }
                            consumer.acknowledge(message);
                        } catch (Exception e) {
                            consumer.negativeAcknowledge(message);
                            this.exceptionEmitter.onNext(new FailedMessage(e, consumer, message));
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
