package com.agorapulse.micronaut.aws.kinesis.worker;

import com.agorapulse.micronaut.aws.kinesis.Event;
import com.agorapulse.micronaut.aws.kinesis.worker.annotation.KinesisListener;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.model.Record;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micronaut.context.BeanContext;
import io.micronaut.context.Qualifier;
import io.micronaut.context.annotation.Requirements;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.context.event.ShutdownEvent;
import io.micronaut.context.exceptions.NoSuchBeanException;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.BeanDefinition;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Requirements({@Requires(property = "aws.kinesis", classes = {KinesisClientLibConfiguration.class}), @Requires(property = "aws.kinesis.listener.enabled", value = "true", defaultValue = "true")})
/* loaded from: input_file:com/agorapulse/micronaut/aws/kinesis/worker/KinesisListenerMethodProcessor.class */
public class KinesisListenerMethodProcessor implements ExecutableMethodProcessor<KinesisListener>, ApplicationEventListener<ShutdownEvent> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KinesisListener.class);
    private final BeanContext beanContext;
    private final ObjectMapper objectMapper;
    private final KinesisWorkerFactory kinesisWorkerFactory;
    private final String consumerFilterKey;
    private final ConcurrentHashMap<String, KinesisWorker> workers = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/agorapulse/micronaut/aws/kinesis/worker/KinesisListenerMethodProcessor$EventAndRecordListener.class */
    public static class EventAndRecordListener implements BiConsumer<String, Record> {
        private final ExecutableMethod method;
        private final Object bean;
        private final ObjectMapper mapper;

        EventAndRecordListener(ExecutableMethod executableMethod, Object obj, ObjectMapper objectMapper) {
            this.method = executableMethod;
            this.bean = obj;
            this.mapper = objectMapper;
        }

        @Override // java.util.function.BiConsumer
        public void accept(String str, Record record) {
            Class type = this.method.getArguments()[0].getType();
            try {
                this.method.invoke(this.bean, new Object[]{this.mapper.readValue(str, type), record});
            } catch (IOException e) {
                throw new IllegalArgumentException("Failed to unmarshall string " + str + " as type " + type);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/agorapulse/micronaut/aws/kinesis/worker/KinesisListenerMethodProcessor$EventListener.class */
    public static class EventListener implements BiConsumer<String, Record> {
        private final ExecutableMethod method;
        private final Object bean;
        private final ObjectMapper mapper;
        private final String consumerFilterKey;

        EventListener(ExecutableMethod executableMethod, Object obj, ObjectMapper objectMapper, String str) {
            this.method = executableMethod;
            this.bean = obj;
            this.mapper = objectMapper;
            this.consumerFilterKey = str;
        }

        @Override // java.util.function.BiConsumer
        public void accept(String str, Record record) {
            Class type = this.method.getArguments()[0].getType();
            try {
                Object readValue = this.mapper.readValue(str, type);
                if ((readValue instanceof Event) && StringUtils.isNotEmpty(this.consumerFilterKey)) {
                    String consumerFilterKey = ((Event) readValue).getConsumerFilterKey();
                    if (!this.consumerFilterKey.equals(consumerFilterKey)) {
                        KinesisListenerMethodProcessor.LOGGER.info("Ignoring event because expected consumer filter key {} is not equal to the event's filter key {}: {}", new Object[]{this.consumerFilterKey, consumerFilterKey, str});
                        return;
                    }
                }
                this.method.invoke(this.bean, new Object[]{readValue});
            } catch (IOException e) {
                throw new IllegalArgumentException("Failed to unmarshall string " + str + " as type " + type);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/agorapulse/micronaut/aws/kinesis/worker/KinesisListenerMethodProcessor$RecordListener.class */
    public static class RecordListener implements BiConsumer<String, Record> {
        private final ExecutableMethod method;
        private final Object bean;

        RecordListener(ExecutableMethod executableMethod, Object obj) {
            this.method = executableMethod;
            this.bean = obj;
        }

        @Override // java.util.function.BiConsumer
        public void accept(String str, Record record) {
            this.method.invoke(this.bean, new Object[]{record});
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/agorapulse/micronaut/aws/kinesis/worker/KinesisListenerMethodProcessor$StringAndRecordListener.class */
    public static class StringAndRecordListener implements BiConsumer<String, Record> {
        private final ExecutableMethod method;
        private final Object bean;

        StringAndRecordListener(ExecutableMethod executableMethod, Object obj) {
            this.method = executableMethod;
            this.bean = obj;
        }

        @Override // java.util.function.BiConsumer
        public void accept(String str, Record record) {
            this.method.invoke(this.bean, new Object[]{str, record});
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/agorapulse/micronaut/aws/kinesis/worker/KinesisListenerMethodProcessor$StringListener.class */
    public static class StringListener implements BiConsumer<String, Record> {
        private final ExecutableMethod method;
        private final Object bean;

        public StringListener(ExecutableMethod executableMethod, Object obj) {
            this.method = executableMethod;
            this.bean = obj;
        }

        @Override // java.util.function.BiConsumer
        public void accept(String str, Record record) {
            this.method.invoke(this.bean, new Object[]{str});
        }
    }

    public KinesisListenerMethodProcessor(BeanContext beanContext, ObjectMapper objectMapper, KinesisWorkerFactory kinesisWorkerFactory, @Value("${aws.kinesis.consumer-filter-key:}") String str) {
        this.beanContext = beanContext;
        this.objectMapper = objectMapper;
        this.kinesisWorkerFactory = kinesisWorkerFactory;
        this.consumerFilterKey = str;
    }

    public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> executableMethod) {
        Argument[] arguments = executableMethod.getArguments();
        if (arguments.length > 2) {
            throw new IllegalArgumentException("Method must implement at most two arguments");
        }
        if (arguments.length < 1) {
            throw new IllegalArgumentException("Method must implement at least one arguments");
        }
        if (arguments.length == 2 && !Record.class.isAssignableFrom(arguments[1].getType())) {
            throw new IllegalArgumentException("Second argument must be Record");
        }
        Qualifier qualifier = (Qualifier) beanDefinition.getAnnotationTypeByStereotype(jakarta.inject.Qualifier.class).map(cls -> {
            return Qualifiers.byAnnotation(beanDefinition, cls);
        }).orElse(null);
        this.workers.computeIfAbsent((String) executableMethod.getValue(KinesisListener.class, String.class).get(), str -> {
            KinesisWorker create = this.kinesisWorkerFactory.create(getKinesisConfiguration(str));
            LOGGER.info("Kinesis worker for configuration {} created", str);
            create.start();
            return create;
        }).addConsumer(createConsumer(executableMethod, this.beanContext.getBean(beanDefinition.getBeanType(), qualifier)));
        LOGGER.info("Kinesis listener for method {} declared in {} registered", executableMethod, beanDefinition.getBeanType());
    }

    public void onApplicationEvent(ShutdownEvent shutdownEvent) {
        this.workers.values().forEach((v0) -> {
            v0.shutdown();
        });
    }

    private KinesisClientLibConfiguration getKinesisConfiguration(String str) {
        try {
            return (KinesisClientLibConfiguration) this.beanContext.getBean(KinesisClientLibConfiguration.class, Qualifiers.byName(str));
        } catch (NoSuchBeanException e) {
            LOGGER.error("Cannot setup listener Kinesis listener, application name is missing. Configuration for Kinesis client with name '{}' is missing", str);
            return null;
        }
    }

    private BiConsumer<String, Record> createConsumer(ExecutableMethod executableMethod, Object obj) {
        Argument[] arguments = executableMethod.getArguments();
        return arguments.length == 2 ? CharSequence.class.isAssignableFrom(arguments[0].getType()) ? new StringAndRecordListener(executableMethod, obj) : new EventAndRecordListener(executableMethod, obj, this.objectMapper) : CharSequence.class.isAssignableFrom(arguments[0].getType()) ? new StringListener(executableMethod, obj) : Record.class.isAssignableFrom(arguments[0].getType()) ? new RecordListener(executableMethod, obj) : new EventListener(executableMethod, obj, this.objectMapper, this.consumerFilterKey);
    }

    public /* bridge */ /* synthetic */ void process(BeanDefinition beanDefinition, Object obj) {
        process((BeanDefinition<?>) beanDefinition, (ExecutableMethod<?, ?>) obj);
    }
}
