/*
 * Decompiled with CFR 0.152.
 */
package org.esbtools.eventhandler;

import com.google.common.util.concurrent.Futures;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.camel.builder.RouteBuilder;
import org.esbtools.eventhandler.DocumentEvent;
import org.esbtools.eventhandler.DocumentEventRepository;
import org.esbtools.eventhandler.FailedNotification;
import org.esbtools.eventhandler.Notification;
import org.esbtools.eventhandler.NotificationRepository;

public class PollingNotificationProcessorRoute
extends RouteBuilder {
    private final NotificationRepository notificationRepository;
    private final DocumentEventRepository documentEventRepository;
    private final Duration pollingInterval;
    private final int batchSize;
    private static final AtomicInteger idCounter = new AtomicInteger(1);
    private final int id = idCounter.getAndIncrement();

    public PollingNotificationProcessorRoute(NotificationRepository notificationRepository, DocumentEventRepository documentEventRepository, Duration pollingInterval, int batchSize) {
        this.notificationRepository = notificationRepository;
        this.documentEventRepository = documentEventRepository;
        this.pollingInterval = pollingInterval;
        this.batchSize = batchSize;
    }

    public void configure() throws Exception {
        this.from("timer:pollForNotifications" + this.id + "?period=" + this.pollingInterval.toMillis()).routeId("notificationProcessor-" + this.id).process(exchange -> {
            List<? extends Notification> notifications = this.notificationRepository.retrieveOldestNotificationsUpTo(this.batchSize);
            HashMap<Notification, Future<Collection<DocumentEvent>>> notificationsToFutureEvents = new HashMap<Notification, Future<Collection<DocumentEvent>>>(notifications.size());
            for (Notification notification : notifications) {
                try {
                    Future<Collection<DocumentEvent>> futureEvents = notification.toDocumentEvents();
                    notificationsToFutureEvents.put(notification, futureEvents);
                }
                catch (Exception e) {
                    this.log.error("Failed to get future document events for notification: " + notification, (Throwable)e);
                    notificationsToFutureEvents.put(notification, (Future<Collection<DocumentEvent>>)Futures.immediateFailedFuture((Throwable)e));
                }
            }
            HashMap<Notification, Collection> notificationsToDocumentEvents = new HashMap<Notification, Collection>();
            ArrayList<FailedNotification> arrayList = new ArrayList<FailedNotification>();
            for (Map.Entry notificationToFutureEvents : notificationsToFutureEvents.entrySet()) {
                Notification notification = (Notification)notificationToFutureEvents.getKey();
                Future futureEvents = (Future)notificationToFutureEvents.getValue();
                try {
                    Collection events = (Collection)futureEvents.get();
                    notificationsToDocumentEvents.put(notification, events);
                }
                catch (InterruptedException | ExecutionException e) {
                    this.log.error("Failed to get document events for notification: " + notification, (Throwable)e);
                    arrayList.add(new FailedNotification(notification, e));
                }
            }
            Iterator notificationsToEventsIterator = notificationsToDocumentEvents.entrySet().iterator();
            while (notificationsToEventsIterator.hasNext()) {
                Map.Entry notificationToEvents = notificationsToEventsIterator.next();
                try {
                    this.notificationRepository.ensureTransactionActive((Notification)notificationToEvents.getKey());
                }
                catch (Exception e) {
                    notificationsToEventsIterator.remove();
                    if (!this.log.isWarnEnabled()) continue;
                    this.log.warn("Notification transaction no longer active, not processing: " + notificationToEvents.getKey(), (Throwable)e);
                }
            }
            List documentEvents = notificationsToDocumentEvents.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
            this.log.debug("Persisting {} document events via route {}: {}", new Object[]{documentEvents.size(), exchange.getFromRouteId(), documentEvents});
            try {
                this.documentEventRepository.addNewDocumentEvents(documentEvents);
            }
            catch (Exception e) {
                this.log.error("Failed to persist new document events from notifications. Rolling back processing. Document events were: " + documentEvents, (Throwable)e);
                notificationsToDocumentEvents.clear();
            }
            this.notificationRepository.markNotificationsProcessedOrFailed(notificationsToDocumentEvents.keySet(), arrayList);
        });
    }
}

