package net.osomahe.esk.eventstore.control;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.enterprise.concurrent.ManagedScheduledExecutorService;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.json.JsonObject;
import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;
import net.osomahe.esk.config.boundary.ConfigurationBoundary;
import net.osomahe.esk.eventstore.entity.AbstractEvent;
import net.osomahe.esk.eventstore.entity.AsyncEvent;
import net.osomahe.esk.eventstore.entity.EventName;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

@Singleton
@Startup
/* loaded from: input_file:net/osomahe/esk/eventstore/control/EventStoreSubscriber.class */
public class EventStoreSubscriber {

    @Inject
    private ConfigurationBoundary config;

    @Inject
    private EventSubscriptionDataStore eventDataStore;

    @Inject
    private TopicService topicService;

    @Inject
    private Event<AbstractEvent> events;

    @Resource
    private ManagedScheduledExecutorService mses;
    private Map<String, Map<String, Class<? extends AbstractEvent>>> mapTopics = new ConcurrentHashMap();
    private Jsonb jsonb;
    private KafkaConsumer<String, JsonObject> consumer;
    private ScheduledFuture<?> sfConsumerPoll;

    @PostConstruct
    public void init() {
        this.jsonb = JsonbBuilder.create();
        this.eventDataStore.getEventClasses().forEach(this::subscribeForTopic);
        if (this.mapTopics.size() > 0) {
            this.consumer = new KafkaConsumer<>(this.config.getKafkaConsumerConfig(), new StringDeserializer(), new EventDeserializer());
            this.consumer.subscribe(this.mapTopics.keySet());
            this.sfConsumerPoll = this.mses.scheduleAtFixedRate(this::pollMessages, 1000L, 100L, TimeUnit.MILLISECONDS);
        }
    }

    private void subscribeForTopic(Class<? extends AbstractEvent> cls) {
        String topicName = this.topicService.getTopicName(cls);
        if (!this.mapTopics.containsKey(topicName)) {
            this.mapTopics.put(topicName, new ConcurrentHashMap());
        }
        this.mapTopics.get(topicName).put(getEventName(cls), cls);
    }

    private String getEventName(Class<? extends AbstractEvent> cls) {
        EventName eventName = (EventName) cls.getAnnotation(EventName.class);
        return eventName != null ? eventName.value() : cls.getSimpleName();
    }

    private void pollMessages() {
        synchronized (this.consumer) {
            Iterator it = this.consumer.poll(100L).iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                JsonObject jsonObject = (JsonObject) consumerRecord.value();
                String string = jsonObject.getString("name");
                Map<String, Class<? extends AbstractEvent>> map = this.mapTopics.get(consumerRecord.topic());
                if (map.containsKey(string)) {
                    Class<? extends AbstractEvent> cls = map.get(string);
                    AbstractEvent abstractEvent = (AbstractEvent) this.jsonb.fromJson(jsonObject.getJsonObject("data").toString(), cls);
                    if (cls.isAnnotationPresent(AsyncEvent.class)) {
                        this.events.fireAsync(abstractEvent);
                    } else {
                        this.events.fire(abstractEvent);
                    }
                }
            }
        }
    }

    @PreDestroy
    public void destroy() {
        if (this.sfConsumerPoll != null) {
            synchronized (this.consumer) {
                this.sfConsumerPoll.cancel(false);
                this.consumer.close(5L, TimeUnit.SECONDS);
            }
        }
    }
}
