package com.networknt.eventuate.server.jdbckafkastore;

import com.networknt.eventuate.common.Int128;
import com.networknt.eventuate.common.SubscriberOptions;
import com.networknt.eventuate.common.impl.AggregateEvents;
import com.networknt.eventuate.common.impl.JSonMapper;
import com.networknt.eventuate.common.impl.SerializedEvent;
import com.networknt.eventuate.kafka.consumer.EventuateKafkaConsumer;
import com.networknt.eventuate.server.common.AggregateTopicMapping;
import com.networknt.eventuate.server.common.PublishedEvent;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/eventuate/server/jdbckafkastore/EventuateKafkaAggregateSubscriptions.class */
public class EventuateKafkaAggregateSubscriptions implements AggregateEvents {
    private static Logger logger = LoggerFactory.getLogger((Class<?>) EventuateKafkaAggregateSubscriptions.class);
    private final List<EventuateKafkaConsumer> consumers = new ArrayList();

    public void cleanUp() {
        synchronized (this.consumers) {
            this.consumers.stream().forEach((v0) -> {
                v0.stop();
            });
        }
        logger.debug("Waiting for consumers to commit");
        try {
            TimeUnit.SECONDS.sleep(2L);
        } catch (InterruptedException e) {
            logger.error("Error waiting", (Throwable) e);
        }
    }

    private void addConsumer(EventuateKafkaConsumer eventuateKafkaConsumer) {
        synchronized (this.consumers) {
            this.consumers.add(eventuateKafkaConsumer);
        }
    }

    @Override // com.networknt.eventuate.common.impl.AggregateEvents
    public CompletableFuture<?> subscribe(String str, Map<String, Set<String>> map, SubscriberOptions subscriberOptions, Function<SerializedEvent, CompletableFuture<?>> function) {
        EventuateKafkaConsumer eventuateKafkaConsumer = new EventuateKafkaConsumer(str, (consumerRecord, biConsumer) -> {
            SerializedEvent serializedEvent = toSerializedEvent(consumerRecord);
            if (((Set) map.get(serializedEvent.getEntityType())).contains(serializedEvent.getEventType())) {
                ((CompletableFuture) function.apply(serializedEvent)).whenComplete((obj, th) -> {
                    biConsumer.accept(null, th);
                });
            } else {
                biConsumer.accept(null, null);
            }
        }, (List) map.keySet().stream().map(AggregateTopicMapping::aggregateTypeToTopic).collect(Collectors.toList()));
        addConsumer(eventuateKafkaConsumer);
        eventuateKafkaConsumer.start();
        return CompletableFuture.completedFuture(null);
    }

    private SerializedEvent toSerializedEvent(ConsumerRecord<String, String> consumerRecord) {
        PublishedEvent publishedEvent = (PublishedEvent) JSonMapper.fromJson(consumerRecord.value(), PublishedEvent.class);
        return new SerializedEvent(Int128.fromString(publishedEvent.getId()), publishedEvent.getEntityId(), publishedEvent.getEntityType(), publishedEvent.getEventData(), publishedEvent.getEventType(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), EtopEventContext.make(publishedEvent.getId(), consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset()), publishedEvent.getMetadata());
    }
}
