package pcosta.kafka.spring;

import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.Message;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.GenericTypeResolver;
import pcosta.kafka.api.MessageFilter;
import pcosta.kafka.api.MessageListener;
import pcosta.kafka.api.MessageReceiverConfiguration;
import pcosta.kafka.api.MessagingContext;
import pcosta.kafka.api.MessagingException;
import pcosta.kafka.api.PlatformErrorListener;
import pcosta.kafka.api.annotation.DEFAULT_MESSAGE_TYPE;
import pcosta.kafka.api.annotation.EnableKafkaApiBootstrap;
import pcosta.kafka.api.annotation.EnableListenerConfiguration;
import pcosta.kafka.api.annotation.ErrorListener;
import pcosta.kafka.api.annotation.MessagingListener;
import pcosta.kafka.configuration.ReceiverConfigurationBuilder;

/* loaded from: input_file:pcosta/kafka/spring/KafkaApiBootstrap.class */
class KafkaApiBootstrap implements ApplicationListener<ContextRefreshedEvent> {
    private static final Logger log = LoggerFactory.getLogger(KafkaApiBootstrap.class);
    volatile boolean initialized = false;

    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        EnableKafkaApiBootstrap messagingAnnotation = getMessagingAnnotation(contextRefreshedEvent.getApplicationContext());
        if (messagingAnnotation == null || !messagingAnnotation.autoRegisterListeners() || this.initialized) {
            return;
        }
        start(contextRefreshedEvent.getApplicationContext());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void start(ApplicationContext applicationContext) {
        try {
            if (!this.initialized) {
                EnableListenerConfiguration messagingConfigurationAnnotation = getMessagingConfigurationAnnotation(applicationContext);
                EnableKafkaApiBootstrap messagingAnnotation = getMessagingAnnotation(applicationContext);
                if (messagingConfigurationAnnotation == null && messagingAnnotation != null) {
                    setupListeners(applicationContext);
                } else if (messagingConfigurationAnnotation != null && messagingAnnotation != null) {
                    setupListenerWithConfiguration(applicationContext);
                }
            }
        } catch (MessagingException e) {
            throw new IllegalStateException("Unable to bootstrap messaging listeners", e);
        }
    }

    private EnableListenerConfiguration getMessagingConfigurationAnnotation(ApplicationContext applicationContext) {
        Map beansWithAnnotation = applicationContext.getBeansWithAnnotation(EnableListenerConfiguration.class);
        if (beansWithAnnotation.size() == 1) {
            return (EnableListenerConfiguration) applicationContext.findAnnotationOnBean((String) ((Map.Entry) beansWithAnnotation.entrySet().iterator().next()).getKey(), EnableListenerConfiguration.class);
        }
        return null;
    }

    private EnableKafkaApiBootstrap getMessagingAnnotation(ApplicationContext applicationContext) {
        Map beansWithAnnotation = applicationContext.getBeansWithAnnotation(EnableKafkaApiBootstrap.class);
        if (beansWithAnnotation.size() == 1) {
            return (EnableKafkaApiBootstrap) applicationContext.findAnnotationOnBean((String) ((Map.Entry) beansWithAnnotation.entrySet().iterator().next()).getKey(), EnableKafkaApiBootstrap.class);
        }
        return null;
    }

    synchronized void setupListeners(ApplicationContext applicationContext) throws MessagingException {
        MessagingContext messagingContext = (MessagingContext) applicationContext.getBean("kafkaMessagingContext");
        ReceiverConfigurationBuilder createReceiverConfigurationBuilder = createReceiverConfigurationBuilder();
        createErrorListener(applicationContext, createReceiverConfigurationBuilder);
        createModuleAndPlatformListeners(applicationContext, createReceiverConfigurationBuilder);
        messagingContext.createReceiver(createReceiverConfigurationBuilder.build());
        this.initialized = true;
    }

    private synchronized void setupListenerWithConfiguration(ApplicationContext applicationContext) throws MessagingException {
        ((MessagingContext) applicationContext.getBean("kafkaMessagingContext")).createReceiver(getReceiverConfiguration(applicationContext));
        this.initialized = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void stopListeners(ApplicationContext applicationContext) throws MessagingException {
        ((MessagingContext) applicationContext.getBean("kafkaMessagingContext")).shutdown();
        this.initialized = false;
    }

    ReceiverConfigurationBuilder createReceiverConfigurationBuilder() {
        return ReceiverConfigurationBuilder.newBuilder();
    }

    private void createErrorListener(ApplicationContext applicationContext, ReceiverConfigurationBuilder receiverConfigurationBuilder) throws KafkaApiBootstrapException {
        Map beansWithAnnotation = applicationContext.getBeansWithAnnotation(ErrorListener.class);
        log.debug("found {} beans annotated with {}", Integer.valueOf(beansWithAnnotation.size()), ErrorListener.class);
        if (beansWithAnnotation.isEmpty()) {
            throw new KafkaApiBootstrapException("Unable to find an Error listener");
        }
        if (beansWithAnnotation.size() > 1) {
            throw new KafkaApiBootstrapException("At the most only one bean must be defined with ErrorListener annotation");
        }
        Map.Entry entry = (Map.Entry) beansWithAnnotation.entrySet().iterator().next();
        log.debug("detected {} as error listener", entry.getKey());
        receiverConfigurationBuilder.withErrorListener((PlatformErrorListener) entry.getValue());
    }

    private MessageReceiverConfiguration getReceiverConfiguration(ApplicationContext applicationContext) throws KafkaApiBootstrapException {
        Map beansWithAnnotation = applicationContext.getBeansWithAnnotation(EnableListenerConfiguration.class);
        log.debug("found {} beans annotated with {}", Integer.valueOf(beansWithAnnotation.size()), EnableListenerConfiguration.class);
        if (beansWithAnnotation.isEmpty()) {
            throw new KafkaApiBootstrapException("Unable to find an EnableListenerConfiguration bean");
        }
        if (beansWithAnnotation.size() > 1) {
            throw new KafkaApiBootstrapException("At the most only one bean must be defined with EnableListenerConfiguration annotation");
        }
        Map.Entry entry = (Map.Entry) beansWithAnnotation.entrySet().iterator().next();
        log.debug("detected {} as EnableListenerConfiguration bean", entry.getKey());
        return ((ReceiverConfigurationProvider) entry.getValue()).getReceiverConfiguration();
    }

    private void createModuleAndPlatformListeners(ApplicationContext applicationContext, ReceiverConfigurationBuilder receiverConfigurationBuilder) throws KafkaApiBootstrapException {
        Class<? extends Message> message;
        Map beansWithAnnotation = applicationContext.getBeansWithAnnotation(MessagingListener.class);
        log.debug("found {} beans annotated with {}", Integer.valueOf(beansWithAnnotation.size()), MessagingListener.class);
        for (Map.Entry entry : beansWithAnnotation.entrySet()) {
            MessagingListener messagingListener = (MessagingListener) applicationContext.findAnnotationOnBean((String) entry.getKey(), MessagingListener.class);
            log.debug("configuration for {} = {}", entry.getKey(), messagingListener);
            if (messagingListener.message() == DEFAULT_MESSAGE_TYPE.class) {
                message = GenericTypeResolver.resolveTypeArgument(entry.getValue().getClass(), MessageListener.class);
                if (message == null) {
                    throw new KafkaApiBootstrapException("Invalid listener definition: unable to determine the message type for bean " + ((String) entry.getKey()));
                }
            } else {
                message = messagingListener.message();
            }
            log.debug("generic type of {} message listener is {}", entry.getKey(), message);
            receiverConfigurationBuilder.newListener().addHandler((MessageListener) entry.getValue()).addTopics(messagingListener.topic()).addInitialOffset(((MessageListener) entry.getValue()).initialOffset(messagingListener.topic())).addTopicPartition(messagingListener.partition()).withMessageType(messagingListener.message()).withExtensionRegistry(createExtensionRegistry(messagingListener.extensionRegistry())).addFilters(createMessageFilters(messagingListener.filters(), applicationContext)).buildListener();
        }
    }

    private ExtensionRegistry createExtensionRegistry(Class<? extends ExtensionRegistrySupplier> cls) {
        try {
            return cls.newInstance().get();
        } catch (IllegalAccessException | InstantiationException e) {
            throw new MessagingException("Unable to create an instance of " + cls, e);
        }
    }

    private Collection<MessageFilter> createMessageFilters(Class<? extends MessageFilter>[] clsArr, ApplicationContext applicationContext) {
        ArrayList arrayList = new ArrayList(clsArr.length);
        for (Class<? extends MessageFilter> cls : clsArr) {
            MessageFilter messageFilter = (MessageFilter) applicationContext.getBean(cls);
            if (messageFilter == null) {
                log.debug("unable to find bean for message filter " + cls.getSimpleName());
                try {
                    messageFilter = cls.newInstance();
                } catch (IllegalAccessException | InstantiationException e) {
                    log.error("unable to find instantiate message filter " + cls.getSimpleName() + " as fallback", e);
                }
            }
            if (messageFilter != null) {
                arrayList.add(messageFilter);
            }
        }
        return arrayList;
    }
}
