/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.eventuate.client;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.networknt.eventuate.cdccore.AggregateTopicMapping;
import com.networknt.eventuate.cdccore.PublishedEvent;
import com.networknt.eventuate.cdccore.kafka.consumer.CdcKafkaConsumer;
import com.networknt.eventuate.common.EventContext;
import com.networknt.eventuate.common.Int128;
import com.networknt.eventuate.common.SubscriberOptions;
import com.networknt.eventuate.common.impl.AggregateEvents;
import com.networknt.eventuate.common.impl.SerializedEvent;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaAggregateSubscriptions
implements AggregateEvents {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    String bootstrapServers = null;
    private final List<CdcKafkaConsumer> consumers = new ArrayList<CdcKafkaConsumer>();

    public KafkaAggregateSubscriptions() {
    }

    public KafkaAggregateSubscriptions(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @PreDestroy
    public void cleanUp() {
        List<CdcKafkaConsumer> list = this.consumers;
        synchronized (list) {
            this.consumers.stream().forEach(CdcKafkaConsumer::stop);
        }
        this.logger.debug("Waiting for consumers to commit");
        try {
            TimeUnit.SECONDS.sleep(2L);
        }
        catch (InterruptedException e) {
            this.logger.error("Error waiting", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addConsumer(CdcKafkaConsumer consumer) {
        List<CdcKafkaConsumer> list = this.consumers;
        synchronized (list) {
            this.consumers.add(consumer);
        }
    }

    public CompletableFuture<?> subscribe(String subscriberId, Map<String, Set<String>> aggregatesAndEvents, SubscriberOptions subscriberOptions, Function<SerializedEvent, CompletableFuture<?>> handler) {
        List topics = aggregatesAndEvents.keySet().stream().map(AggregateTopicMapping::aggregateTypeToTopic).collect(Collectors.toList());
        CdcKafkaConsumer consumer = new CdcKafkaConsumer(subscriberId, (record, callback) -> {
            SerializedEvent se = this.toSerializedEvent((ConsumerRecord<String, String>)record);
            if (((Set)aggregatesAndEvents.get(se.getEntityType())).contains(se.getEventType())) {
                ((CompletableFuture)handler.apply(se)).whenComplete((result, t) -> callback.accept(null, t));
            } else {
                callback.accept(null, null);
            }
        }, topics, this.bootstrapServers);
        this.addConsumer(consumer);
        consumer.start();
        return CompletableFuture.completedFuture(null);
    }

    private SerializedEvent toSerializedEvent(ConsumerRecord<String, String> record) {
        try {
            ObjectMapper om = new ObjectMapper();
            PublishedEvent pe = (PublishedEvent)om.readValue((String)record.value(), PublishedEvent.class);
            return new SerializedEvent(Int128.fromString((String)pe.getId()), pe.getEntityId(), pe.getEntityType(), pe.getEventData(), pe.getEventType(), Integer.valueOf(record.partition()), Long.valueOf(record.offset()), new EventContext(this.makeEventToken(pe.getId(), record.topic(), record.partition(), record.offset())));
        }
        catch (IOException e) {
            this.logger.error("Got exception: ", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private String makeEventToken(String id, String topic, int partition, long offset) {
        return id;
    }
}

