/*
 * Decompiled with CFR 0.152.
 */
package org.esbtools.eventhandler;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.RouteDefinition;
import org.esbtools.eventhandler.FailedMessage;
import org.esbtools.eventhandler.Message;
import org.esbtools.eventhandler.MessageFactory;

public class AsyncBatchMessageProcessorRoute
extends RouteBuilder {
    private final String fromUri;
    private final String failureUri;
    private final Duration processTimeout;
    private final MessageFactory messageFactory;
    private final int idCount = idCounter.getAndIncrement();
    private final String routeId = "messageProcessor-" + this.idCount;
    private static final AtomicInteger idCounter = new AtomicInteger(0);

    public AsyncBatchMessageProcessorRoute(String fromUri, String failureUri, Duration processTimeout, MessageFactory messageFactory) {
        this.fromUri = Objects.requireNonNull(fromUri, "fromUri");
        this.failureUri = Objects.requireNonNull(failureUri, "failureUri");
        this.processTimeout = Objects.requireNonNull(processTimeout, "processTimeout");
        this.messageFactory = Objects.requireNonNull(messageFactory, "messageFactory");
    }

    public void configure() throws Exception {
        ((RouteDefinition)this.from(this.fromUri).routeId(this.routeId).process(exchange -> {
            Object exchangeBody = exchange.getIn().getBody();
            if (!(exchangeBody instanceof Collection)) {
                throw new IllegalArgumentException("Expected `fromUri` to deliver exchanges with Collection bodies so that we may batch process for efficiency. However, the uri '" + this.fromUri + "' returned " + (exchangeBody == null ? "null." : "the " + exchangeBody.getClass().getName() + ": " + exchangeBody));
            }
            Collection originalMessages = (Collection)exchangeBody;
            ArrayList<ProcessingMessage> processingMessages = new ArrayList<ProcessingMessage>(originalMessages.size());
            ArrayList<FailedMessage> failures = new ArrayList<FailedMessage>();
            this.log.debug("Received {} messages on route {}: {}", new Object[]{originalMessages.size(), this.routeId, originalMessages});
            for (Object originalMessage : originalMessages) {
                Future<Void> processingFuture;
                Message message;
                try {
                    message = this.messageFactory.getMessageForBody(originalMessage);
                }
                catch (Exception e) {
                    this.log.error("Failure parsing message. Body was: " + originalMessage, (Throwable)e);
                    failures.add(new FailedMessage(originalMessage, e));
                    continue;
                }
                try {
                    processingFuture = message.process();
                }
                catch (Exception e) {
                    this.log.error("Failed to process message: " + message, (Throwable)e);
                    FailedMessage failure = new FailedMessage(originalMessage, message, e);
                    failures.add(failure);
                    continue;
                }
                ProcessingMessage processing = new ProcessingMessage(originalMessage, message, processingFuture);
                processingMessages.add(processing);
            }
            ArrayList<Message> processedSuccessfully = this.log.isDebugEnabled() ? new ArrayList<Message>(processingMessages.size()) : Collections.emptyList();
            for (ProcessingMessage processingMsg : processingMessages) {
                FailedMessage failure;
                try {
                    processingMsg.future.get(this.processTimeout.toMillis(), TimeUnit.MILLISECONDS);
                    if (!this.log.isDebugEnabled()) continue;
                    processedSuccessfully.add(processingMsg.parsedMessage);
                }
                catch (ExecutionException e) {
                    this.log.error("Failed to process message: " + processingMsg.parsedMessage, (Throwable)e);
                    failure = new FailedMessage(processingMsg.originalMessage, processingMsg.parsedMessage, e.getCause());
                    failures.add(failure);
                }
                catch (InterruptedException | TimeoutException e) {
                    this.log.warn("Timed out processing message: " + processingMsg.parsedMessage, (Throwable)e);
                    failure = new FailedMessage(processingMsg.originalMessage, processingMsg.parsedMessage, e);
                    failures.add(failure);
                }
            }
            this.log.debug("Processed {}/{} messages on route {}: {}", new Object[]{processedSuccessfully.size(), originalMessages.size(), this.routeId, processedSuccessfully});
            exchange.getIn().setBody(failures);
        })).to(this.failureUri);
    }

    private static class ProcessingMessage {
        final Object originalMessage;
        final Message parsedMessage;
        final Future<Void> future;

        ProcessingMessage(Object originalMessage, Message parsedMessage, Future<Void> future) {
            this.originalMessage = originalMessage;
            this.parsedMessage = parsedMessage;
            this.future = future;
        }
    }
}

