package io.github.majusko.pulsar2.solon.consumer;

import io.github.majusko.pulsar2.solon.PulsarMessage;
import io.github.majusko.pulsar2.solon.collector.ConsumerHolder;
import io.github.majusko.pulsar2.solon.collector.IConsumerConst;
import io.github.majusko.pulsar2.solon.constant.BatchAckMode;
import io.github.majusko.pulsar2.solon.error.FailedBatchMessages;
import io.github.majusko.pulsar2.solon.error.FailedMessage;
import io.github.majusko.pulsar2.solon.error.exception.ClientInitException;
import io.github.majusko.pulsar2.solon.error.exception.ConsumerInitException;
import io.github.majusko.pulsar2.solon.properties.ConsumerProperties;
import io.github.majusko.pulsar2.solon.properties.PulsarProperties;
import io.github.majusko.pulsar2.solon.utils.SchemaUtils;
import io.github.majusko.pulsar2.solon.utils.UrlBuildService;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerInterceptor;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.noear.solon.core.AppContext;
import reactor.core.Disposable;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;

/* loaded from: input_file:io/github/majusko/pulsar2/solon/consumer/ConsumerAggregator.class */
public class ConsumerAggregator {
    private final Sinks.Many<FailedMessage> sink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    private PulsarClient pulsarClient;
    private ConsumerProperties consumerProperties;
    private PulsarProperties pulsarProperties;
    private UrlBuildService urlBuildService;
    private ConsumerInterceptor consumerInterceptor;
    private List<Consumer> consumers;
    private List<CompletableFuture<?>> batchListenerList;

    public void init(AppContext appContext, ConsumerProperties consumerProperties) {
        if (this.pulsarClient == null) {
            this.pulsarClient = (PulsarClient) appContext.getBean(PulsarClient.class);
        }
        if (this.consumerInterceptor == null) {
            this.consumerInterceptor = (ConsumerInterceptor) appContext.getBean(ConsumerInterceptor.class);
        }
        if (this.pulsarProperties == null) {
            this.pulsarProperties = (PulsarProperties) appContext.getBean(PulsarProperties.class);
        }
        if (this.urlBuildService == null) {
            this.consumerProperties = consumerProperties;
            this.urlBuildService = new UrlBuildService(this.pulsarProperties, this.consumerProperties);
        }
        if (this.pulsarProperties.isAutoStart()) {
            this.consumers = new ArrayList();
            for (Map.Entry<String, ConsumerHolder> entry : IConsumerConst.consumers.entrySet()) {
                if (entry.getValue().getAnnotation().autoStart()) {
                    this.consumers.add(subscribe(entry.getKey(), entry.getValue()));
                }
            }
        }
    }

    private Consumer<?> subscribe(String str, ConsumerHolder consumerHolder) {
        try {
            ConsumerBuilder<?> subscriptionInitialPosition = this.pulsarClient.newConsumer(SchemaUtils.getSchema(consumerHolder.getAnnotation().serialization(), consumerHolder.getAnnotation().clazz())).consumerName(this.urlBuildService.buildPulsarConsumerName(consumerHolder.getAnnotation().consumerName(), str)).subscriptionName(this.urlBuildService.buildPulsarSubscriptionName(consumerHolder.getAnnotation().subscriptionName(), str)).topic(new String[]{this.urlBuildService.buildTopicUrl(consumerHolder.getAnnotation().topic(), consumerHolder.getAnnotation().namespace())}).subscriptionType(this.urlBuildService.getSubscriptionType(consumerHolder)).subscriptionInitialPosition(consumerHolder.getAnnotation().initialPosition());
            if (!consumerHolder.getAnnotation().batch()) {
                subscriptionInitialPosition.messageListener((consumer, message) -> {
                    try {
                        Method handler = consumerHolder.getHandler();
                        handler.setAccessible(true);
                        if (consumerHolder.isWrapped()) {
                            handler.invoke(consumerHolder.getBean(), wrapMessage(message));
                        } else {
                            handler.invoke(consumerHolder.getBean(), message.getValue());
                        }
                        consumer.acknowledge(message);
                    } catch (Exception e) {
                        consumer.negativeAcknowledge(message);
                        this.sink.tryEmitNext(new FailedMessage(e, consumer, message));
                    }
                });
            }
            if (consumerHolder.getAnnotation().subscriptionProperties().length > 0) {
                subscriptionInitialPosition.subscriptionProperties((Map) Arrays.stream(consumerHolder.getAnnotation().subscriptionProperties()).collect(Collectors.toMap((v0) -> {
                    return v0.key();
                }, (v0) -> {
                    return v0.value();
                })));
            }
            if (this.pulsarProperties.isAllowInterceptor()) {
                subscriptionInitialPosition.intercept(new ConsumerInterceptor[]{this.consumerInterceptor});
            }
            if (this.consumerProperties.getAckTimeoutMs() > 0) {
                subscriptionInitialPosition.ackTimeout(this.consumerProperties.getAckTimeoutMs(), TimeUnit.MILLISECONDS);
            }
            if (consumerHolder.getAnnotation().batch()) {
                subscriptionInitialPosition.batchReceivePolicy(BatchReceivePolicy.builder().maxNumMessages(consumerHolder.getAnnotation().maxNumMessage()).maxNumBytes(consumerHolder.getAnnotation().maxNumBytes()).timeout(consumerHolder.getAnnotation().timeoutMillis(), TimeUnit.MILLISECONDS).build());
            }
            this.urlBuildService.buildDeadLetterPolicy(consumerHolder.getAnnotation().maxRedeliverCount(), consumerHolder.getAnnotation().deadLetterTopic(), subscriptionInitialPosition);
            Consumer<?> subscribe = subscriptionInitialPosition.subscribe();
            if (consumerHolder.getAnnotation().batch()) {
                createBatchListener(consumerHolder, subscribe);
            }
            return subscribe;
        } catch (PulsarClientException | ClientInitException e) {
            throw new ConsumerInitException("Failed to init consumer.", e);
        }
    }

    private void createBatchListener(ConsumerHolder consumerHolder, Consumer<?> consumer) {
        this.batchListenerList.add(CompletableFuture.runAsync(() -> {
            boolean z = true;
            boolean z2 = false;
            Messages messages = null;
            try {
                Method handler = consumerHolder.getHandler();
                handler.setAccessible(true);
                z = handler.getReturnType().equals(Void.TYPE);
                if (consumerHolder.getAnnotation().batchAckMode() == BatchAckMode.MANUAL) {
                    z2 = true;
                }
                while (true) {
                    messages = consumer.batchReceive();
                    if (z2) {
                        handler.invoke(consumerHolder.getBean(), messages, consumer);
                    } else if (!z && !z2) {
                        List list = (List) handler.invoke(consumerHolder.getBean(), messages);
                        Set set = (Set) list.stream().collect(Collectors.toSet());
                        consumer.acknowledge(list);
                        messages.forEach(message -> {
                            if (set.contains(message)) {
                                return;
                            }
                            consumer.negativeAcknowledge(message);
                        });
                    } else if (!z2) {
                        handler.invoke(consumerHolder.getBean(), messages);
                        consumer.acknowledge(messages);
                    }
                }
            } catch (Exception e) {
                if (z && !z2 && messages != null) {
                    consumer.negativeAcknowledge(messages);
                }
                this.sink.tryEmitNext(new FailedBatchMessages(e, consumer, messages));
            }
        }));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T> PulsarMessage<T> wrapMessage(Message<T> message) {
        PulsarMessage<T> pulsarMessage = (PulsarMessage<T>) new PulsarMessage();
        pulsarMessage.setValue(message.getValue());
        pulsarMessage.setMessageId(message.getMessageId());
        pulsarMessage.setSequenceId(message.getSequenceId());
        pulsarMessage.setProperties(message.getProperties());
        pulsarMessage.setTopicName(message.getTopicName());
        pulsarMessage.setKey(message.getKey());
        pulsarMessage.setEventTime(message.getEventTime());
        pulsarMessage.setPublishTime(message.getPublishTime());
        pulsarMessage.setProducerName(message.getProducerName());
        return pulsarMessage;
    }

    public List<Consumer> getConsumers() {
        return this.consumers;
    }

    public void setConsumers(List<Consumer> list) {
        this.consumers = list;
    }

    public List<CompletableFuture<?>> getBatchListenerList() {
        return this.batchListenerList;
    }

    public Disposable onError(java.util.function.Consumer<? super FailedMessage> consumer) {
        return this.sink.asFlux().subscribe(consumer);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1180606733:
                if (implMethodName.equals("lambda$subscribe$68c66330$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("io/github/majusko/pulsar2/solon/consumer/ConsumerAggregator") && serializedLambda.getImplMethodSignature().equals("(Lio/github/majusko/pulsar2/solon/collector/ConsumerHolder;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    ConsumerAggregator consumerAggregator = (ConsumerAggregator) serializedLambda.getCapturedArg(0);
                    ConsumerHolder consumerHolder = (ConsumerHolder) serializedLambda.getCapturedArg(1);
                    return (consumer, message) -> {
                        try {
                            Method handler = consumerHolder.getHandler();
                            handler.setAccessible(true);
                            if (consumerHolder.isWrapped()) {
                                handler.invoke(consumerHolder.getBean(), wrapMessage(message));
                            } else {
                                handler.invoke(consumerHolder.getBean(), message.getValue());
                            }
                            consumer.acknowledge(message);
                        } catch (Exception e) {
                            consumer.negativeAcknowledge(message);
                            this.sink.tryEmitNext(new FailedMessage(e, consumer, message));
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
