/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.tracking;

import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.handling.Handler;
import io.fluxcapacitor.common.handling.HandlerConfiguration;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.common.ClientUtils;
import io.fluxcapacitor.javaclient.common.exception.FunctionalException;
import io.fluxcapacitor.javaclient.common.exception.TechnicalException;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingMessage;
import io.fluxcapacitor.javaclient.common.serialization.Serializer;
import io.fluxcapacitor.javaclient.configuration.client.Client;
import io.fluxcapacitor.javaclient.persisting.caching.CacheInvalidatingInterceptor;
import io.fluxcapacitor.javaclient.publishing.ResultGateway;
import io.fluxcapacitor.javaclient.tracking.BatchProcessingException;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.FluxCapacitorInterceptor;
import io.fluxcapacitor.javaclient.tracking.Tracking;
import io.fluxcapacitor.javaclient.tracking.TrackingException;
import io.fluxcapacitor.javaclient.tracking.client.DefaultTracker;
import io.fluxcapacitor.javaclient.tracking.handling.HandlerFactory;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultTracking
implements Tracking {
    private static final Logger log = LoggerFactory.getLogger(DefaultTracking.class);
    private final Object $lock = new Object[0];
    private static final HandlerConfiguration trackingHandlerConfiguration = HandlerConfiguration.builder().handlerFilter((c, e) -> !ClientUtils.isLocalHandlerMethod(c, e)).build();
    private final MessageType messageType;
    private final Client client;
    private final ResultGateway resultGateway;
    private final List<ConsumerConfiguration> configurations;
    private final Serializer serializer;
    private final HandlerFactory handlerFactory;
    private final Set<ConsumerConfiguration> startedConfigurations = new HashSet<ConsumerConfiguration>();
    private final Collection<CompletableFuture<?>> outstandingRequests = new CopyOnWriteArrayList();
    private final AtomicReference<Registration> shutdownFunction = new AtomicReference<Registration>(Registration.noOp());

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Registration start(FluxCapacitor fluxCapacitor, List<?> handlers) {
        Object object = this.$lock;
        synchronized (object) {
            return fluxCapacitor.apply(fc -> {
                Map<ConsumerConfiguration, List> consumers = handlers.stream().collect(Collectors.groupingBy(h -> this.configurations.stream().filter(config -> config.getHandlerFilter().test(h)).findFirst().orElseThrow(() -> new TrackingException(String.format("Failed to find consumer for %s", h))))).entrySet().stream().flatMap(e -> {
                    List converted = ((List)e.getValue()).stream().flatMap(target -> this.handlerFactory.createHandler(target, ((ConsumerConfiguration)e.getKey()).getName(), trackingHandlerConfiguration).stream()).collect(Collectors.toList());
                    return converted.isEmpty() ? Stream.empty() : Stream.of(new AbstractMap.SimpleEntry((ConsumerConfiguration)e.getKey(), converted));
                }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
                if (!Collections.disjoint(consumers.keySet(), this.startedConfigurations)) {
                    throw new TrackingException("Failed to start tracking. Consumers for some handlers have already started tracking.");
                }
                this.startedConfigurations.addAll(consumers.keySet());
                Registration registration = consumers.entrySet().stream().map(e -> this.startTracking((ConsumerConfiguration)e.getKey(), (List)e.getValue(), (FluxCapacitor)fc)).reduce(Registration::merge).orElse(Registration.noOp());
                this.shutdownFunction.updateAndGet(r -> r.merge(registration));
                return registration;
            });
        }
    }

    protected Registration startTracking(ConsumerConfiguration configuration, List<Handler<DeserializingMessage>> handlers, FluxCapacitor fluxCapacitor) {
        Consumer<List<SerializedMessage>> consumer = this.createConsumer(configuration, handlers);
        ArrayList<FluxCapacitorInterceptor> batchInterceptors = new ArrayList<FluxCapacitorInterceptor>(Collections.singletonList(new FluxCapacitorInterceptor(fluxCapacitor)));
        switch (configuration.getMessageType()) {
            case COMMAND: 
            case EVENT: {
                batchInterceptors.add((FluxCapacitorInterceptor)((Object)new CacheInvalidatingInterceptor(fluxCapacitor.cache())));
            }
        }
        batchInterceptors.addAll(configuration.getBatchInterceptors());
        configuration = configuration.toBuilder().clearBatchInterceptors().batchInterceptors(batchInterceptors).build();
        return DefaultTracker.start(consumer, configuration, this.client);
    }

    protected Consumer<List<SerializedMessage>> createConsumer(ConsumerConfiguration configuration, List<Handler<DeserializingMessage>> handlers) {
        return serializedMessages -> DeserializingMessage.handleBatch(this.serializer.deserializeMessages(serializedMessages.stream(), false, this.messageType)).forEach(m -> handlers.forEach(h -> this.tryHandle((DeserializingMessage)m, (Handler<DeserializingMessage>)h, configuration)));
    }

    protected void tryHandle(DeserializingMessage message, Handler<DeserializingMessage> handler, ConsumerConfiguration config) {
        try {
            if (handler.canHandle((Object)message)) {
                this.handle(message, handler, config);
            }
        }
        catch (BatchProcessingException e) {
            throw new BatchProcessingException(String.format("Handler %s failed to handle a %s", handler, message), e.getCause(), e.getMessageIndex());
        }
        catch (Exception e) {
            try {
                config.getErrorHandler().handleError(e, String.format("Handler %s failed to handle a %s", handler, message), () -> this.handle(message, handler, config));
            }
            catch (Exception thrown) {
                throw new BatchProcessingException(message.getSerializedObject().getIndex());
            }
        }
    }

    protected void handle(DeserializingMessage message, Handler<DeserializingMessage> handler, ConsumerConfiguration config) {
        Object result;
        Exception exception = null;
        try {
            result = handler.invoke((Object)message);
        }
        catch (FunctionalException e2) {
            result = e2;
            exception = e2;
        }
        catch (Exception e3) {
            result = new TechnicalException(String.format("Handler %s failed to handle a %s", handler, message), e3);
            exception = e3;
        }
        SerializedMessage serializedMessage = message.getSerializedObject();
        boolean shouldSendResponse = this.shouldSendResponse(handler, message);
        if (result instanceof CompletableFuture) {
            CompletionStage future = ((CompletableFuture)result).whenComplete((r, e) -> {
                Object asyncResult = e == null ? r : (e instanceof FunctionalException ? e : new TechnicalException(String.format("Handler %s failed to handle a %s", handler, message), (Throwable)e));
                message.run(m -> {
                    try {
                        if (shouldSendResponse) {
                            this.resultGateway.respond(asyncResult, serializedMessage.getSource(), serializedMessage.getRequestId());
                        }
                        if (e != null) {
                            config.getErrorHandler().handleError((Exception)e, String.format("Handler %s failed to handle a %s", handler, message), () -> this.handle(message, handler, config));
                        }
                    }
                    catch (Exception exc) {
                        log.warn("Did not stop consumer {} after async handler {} failed to handle a {}", new Object[]{config.getName(), handler, message, exc});
                    }
                });
            });
            this.outstandingRequests.add((CompletableFuture<?>)future);
            ((CompletableFuture)future).whenComplete((arg_0, arg_1) -> this.lambda$handle$16((CompletableFuture)future, arg_0, arg_1));
        } else if (shouldSendResponse) {
            this.resultGateway.respond(result, serializedMessage.getSource(), serializedMessage.getRequestId());
        }
        if (exception != null) {
            throw exception;
        }
    }

    private boolean shouldSendResponse(Handler<DeserializingMessage> handler, DeserializingMessage message) {
        SerializedMessage serializedMessage = message.getSerializedObject();
        if (serializedMessage.getRequestId() == null) {
            return false;
        }
        return !handler.isPassive((Object)message);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        Object object = this.$lock;
        synchronized (object) {
            this.shutdownFunction.get().merge(() -> ClientUtils.waitForResults(Duration.ofSeconds(2L), this.outstandingRequests)).cancel();
        }
    }

    @ConstructorProperties(value={"messageType", "client", "resultGateway", "configurations", "serializer", "handlerFactory"})
    public DefaultTracking(MessageType messageType, Client client, ResultGateway resultGateway, List<ConsumerConfiguration> configurations, Serializer serializer, HandlerFactory handlerFactory) {
        this.messageType = messageType;
        this.client = client;
        this.resultGateway = resultGateway;
        this.configurations = configurations;
        this.serializer = serializer;
        this.handlerFactory = handlerFactory;
    }

    private /* synthetic */ void lambda$handle$16(CompletableFuture future, Object r, Throwable e) {
        this.outstandingRequests.remove(future);
    }
}

