/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.eventuate.client;

import com.networknt.eventuate.cdc.common.AggregateTopicMapping;
import com.networknt.eventuate.cdc.common.PublishedEvent;
import com.networknt.eventuate.client.EtopEventContext;
import com.networknt.eventuate.common.Int128;
import com.networknt.eventuate.common.SubscriberOptions;
import com.networknt.eventuate.common.impl.AggregateEvents;
import com.networknt.eventuate.common.impl.JSonMapper;
import com.networknt.eventuate.common.impl.SerializedEvent;
import com.networknt.eventuate.kafka.consumer.EventuateKafkaConsumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaAggregateSubscriptions
implements AggregateEvents {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private final List<EventuateKafkaConsumer> consumers = new ArrayList<EventuateKafkaConsumer>();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @PreDestroy
    public void cleanUp() {
        List<EventuateKafkaConsumer> list = this.consumers;
        synchronized (list) {
            this.consumers.stream().forEach(EventuateKafkaConsumer::stop);
        }
        this.logger.debug("Waiting for consumers to commit");
        try {
            TimeUnit.SECONDS.sleep(2L);
        }
        catch (InterruptedException e) {
            this.logger.error("Error waiting", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addConsumer(EventuateKafkaConsumer consumer) {
        List<EventuateKafkaConsumer> list = this.consumers;
        synchronized (list) {
            this.consumers.add(consumer);
        }
    }

    public CompletableFuture<?> subscribe(String subscriberId, Map<String, Set<String>> aggregatesAndEvents, SubscriberOptions subscriberOptions, Function<SerializedEvent, CompletableFuture<?>> handler) {
        List topics = aggregatesAndEvents.keySet().stream().map(AggregateTopicMapping::aggregateTypeToTopic).collect(Collectors.toList());
        EventuateKafkaConsumer consumer = new EventuateKafkaConsumer(subscriberId, (record, callback) -> {
            SerializedEvent se = this.toSerializedEvent((ConsumerRecord<String, String>)record);
            if (((Set)aggregatesAndEvents.get(se.getEntityType())).contains(se.getEventType())) {
                ((CompletableFuture)handler.apply(se)).whenComplete((result, t) -> callback.accept(null, t));
            } else {
                callback.accept(null, null);
            }
        }, topics);
        this.addConsumer(consumer);
        consumer.start();
        return CompletableFuture.completedFuture(null);
    }

    private SerializedEvent toSerializedEvent(ConsumerRecord<String, String> record) {
        PublishedEvent pe = (PublishedEvent)JSonMapper.fromJson((String)((String)record.value()), PublishedEvent.class);
        return new SerializedEvent(Int128.fromString((String)pe.getId()), pe.getEntityId(), pe.getEntityType(), pe.getEventData(), pe.getEventType(), Integer.valueOf(record.partition()), Long.valueOf(record.offset()), EtopEventContext.make(pe.getId(), record.topic(), record.partition(), record.offset()), pe.getMetadata());
    }
}

