package org.esbtools.eventhandler;

import com.google.common.util.concurrent.Futures;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.camel.builder.RouteBuilder;

/* loaded from: input_file:org/esbtools/eventhandler/PollingNotificationProcessorRoute.class */
public class PollingNotificationProcessorRoute extends RouteBuilder {
    private final NotificationRepository notificationRepository;
    private final DocumentEventRepository documentEventRepository;
    private final Duration pollingInterval;
    private final Duration processTimeout;
    private final int batchSize;
    private static final AtomicInteger idCounter = new AtomicInteger(1);
    private final int id = idCounter.getAndIncrement();

    public PollingNotificationProcessorRoute(NotificationRepository notificationRepository, DocumentEventRepository documentEventRepository, Duration duration, Duration duration2, int i) {
        this.notificationRepository = notificationRepository;
        this.documentEventRepository = documentEventRepository;
        this.pollingInterval = duration;
        this.batchSize = i;
        this.processTimeout = (Duration) Objects.requireNonNull(duration2, "notificationProcessTimeout");
    }

    public void configure() throws Exception {
        from("timer:pollForNotifications" + this.id + "?period=" + this.pollingInterval.toMillis()).routeId("notificationProcessor-" + this.id).process(exchange -> {
            List<? extends Notification> retrieveOldestNotificationsUpTo = this.notificationRepository.retrieveOldestNotificationsUpTo(this.batchSize);
            HashMap hashMap = new HashMap(retrieveOldestNotificationsUpTo.size());
            for (Notification notification : retrieveOldestNotificationsUpTo) {
                try {
                    hashMap.put(notification, notification.toDocumentEvents());
                } catch (Exception e) {
                    this.log.error("Failed to get future document events for notification: " + notification, e);
                    hashMap.put(notification, Futures.immediateFailedFuture(e));
                }
            }
            HashMap hashMap2 = new HashMap();
            ArrayList arrayList = new ArrayList();
            for (Map.Entry entry : hashMap.entrySet()) {
                Notification notification2 = (Notification) entry.getKey();
                try {
                    hashMap2.put(notification2, (Collection) ((Future) entry.getValue()).get(this.processTimeout.toMillis(), TimeUnit.MILLISECONDS));
                } catch (InterruptedException | ExecutionException e2) {
                    this.log.error("Failed to get document events for notification: " + notification2, e2);
                    arrayList.add(new FailedNotification(notification2, e2));
                }
            }
            Iterator it = hashMap2.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry2 = (Map.Entry) it.next();
                try {
                    this.notificationRepository.ensureTransactionActive((Notification) entry2.getKey());
                } catch (Exception e3) {
                    it.remove();
                    if (this.log.isWarnEnabled()) {
                        this.log.warn("Notification transaction no longer active, not processing: " + entry2.getKey(), e3);
                    }
                }
            }
            List list = (List) hashMap2.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList());
            this.log.debug("Persisting {} document events via route {}: {}", new Object[]{Integer.valueOf(list.size()), exchange.getFromRouteId(), list});
            try {
                this.documentEventRepository.addNewDocumentEvents(list);
            } catch (Exception e4) {
                this.log.error("Failed to persist new document events from notifications. Rolling back processing. Document events were: " + list, e4);
                hashMap2.clear();
            }
            this.notificationRepository.markNotificationsProcessedOrFailed(hashMap2.keySet(), arrayList);
        });
    }
}
