package io.fluxcapacitor.javaclient.tracking;

import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.handling.Handler;
import io.fluxcapacitor.common.handling.HandlerConfiguration;
import io.fluxcapacitor.common.handling.HandlerInspector;
import io.fluxcapacitor.common.handling.ParameterResolver;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.common.exception.FunctionalException;
import io.fluxcapacitor.javaclient.common.exception.TechnicalException;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingMessage;
import io.fluxcapacitor.javaclient.common.serialization.Serializer;
import io.fluxcapacitor.javaclient.eventsourcing.CacheInvalidatingInterceptor;
import io.fluxcapacitor.javaclient.publishing.ResultGateway;
import io.fluxcapacitor.javaclient.tracking.client.TrackingClient;
import io.fluxcapacitor.javaclient.tracking.client.TrackingUtils;
import io.fluxcapacitor.javaclient.tracking.handling.HandlerInterceptor;
import java.beans.ConstructorProperties;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/javaclient/tracking/DefaultTracking.class */
public class DefaultTracking implements Tracking {
    private static final Logger log = LoggerFactory.getLogger(DefaultTracking.class);
    private final MessageType messageType;
    private final Class<? extends Annotation> handlerAnnotation;
    private final TrackingClient trackingClient;
    private final ResultGateway resultGateway;
    private final List<ConsumerConfiguration> configurations;
    private final Serializer serializer;
    private final HandlerInterceptor handlerInterceptor;
    private final List<ParameterResolver<? super DeserializingMessage>> parameterResolvers;
    private final Object $lock = new Object[0];
    private final Set<ConsumerConfiguration> startedConfigurations = new HashSet();
    private final AtomicReference<Registration> shutdownFunction = new AtomicReference<>(Registration.noOp());

    @Override // io.fluxcapacitor.javaclient.tracking.Tracking
    public Registration start(FluxCapacitor fluxCapacitor, List<?> list) {
        Registration registration;
        synchronized (this.$lock) {
            Map map = (Map) list.stream().filter(obj -> {
                return HandlerInspector.hasHandlerMethods(obj.getClass(), this.handlerAnnotation);
            }).collect(Collectors.groupingBy(obj2 -> {
                return this.configurations.stream().filter(consumerConfiguration -> {
                    return consumerConfiguration.getHandlerFilter().test(obj2);
                }).findFirst().orElseThrow(() -> {
                    return new TrackingException(String.format("Failed to find consumer for %s", obj2));
                });
            }));
            if (!Collections.disjoint(map.keySet(), this.startedConfigurations)) {
                throw new TrackingException("Failed to start tracking. Consumers for some handlers have already started tracking.");
            }
            this.startedConfigurations.addAll(map.keySet());
            registration = (Registration) map.entrySet().stream().map(entry -> {
                return startTracking((ConsumerConfiguration) entry.getKey(), (List) entry.getValue(), fluxCapacitor);
            }).reduce((v0, v1) -> {
                return v0.merge(v1);
            }).orElse(Registration.noOp());
            AtomicReference<Registration> atomicReference = this.shutdownFunction;
            Objects.requireNonNull(registration);
            atomicReference.updateAndGet(registration::merge);
        }
        return registration;
    }

    protected Registration startTracking(ConsumerConfiguration consumerConfiguration, List<Object> list, FluxCapacitor fluxCapacitor) {
        Consumer<List<SerializedMessage>> createConsumer = createConsumer(consumerConfiguration, list);
        ArrayList arrayList = new ArrayList(Collections.singletonList(new FluxCapacitorInterceptor(fluxCapacitor)));
        if (consumerConfiguration.getMessageType() == MessageType.COMMAND) {
            arrayList.add(new CacheInvalidatingInterceptor(fluxCapacitor.eventSourcing()));
        }
        arrayList.addAll(consumerConfiguration.getTrackingConfiguration().getBatchInterceptors());
        Supplier<String> trackerIdFactory = consumerConfiguration.getTrackingConfiguration().getTrackerIdFactory();
        return TrackingUtils.start(consumerConfiguration.prependApplicationName() ? String.format("%s_%s", fluxCapacitor.client().name(), consumerConfiguration.getName()) : consumerConfiguration.getName(), createConsumer, this.trackingClient, consumerConfiguration.getTrackingConfiguration().toBuilder().clearBatchInterceptors().batchInterceptors(arrayList).trackerIdFactory(() -> {
            return String.format("%s_%s", fluxCapacitor.client().id(), trackerIdFactory.get());
        }).build());
    }

    protected Consumer<List<SerializedMessage>> createConsumer(ConsumerConfiguration consumerConfiguration, List<Object> list) {
        List list2 = (List) list.stream().filter(obj -> {
            return HandlerInspector.hasHandlerMethods(obj.getClass(), this.handlerAnnotation);
        }).map(obj2 -> {
            return HandlerInspector.createHandler(obj2, this.handlerAnnotation, this.parameterResolvers, HandlerConfiguration.builder().invokerFactory(DeserializingMessage.defaultInvokerFactory).build());
        }).collect(Collectors.toList());
        return list3 -> {
            Iterator it = this.serializer.deserialize(list3.stream(), false).map(deserializingObject -> {
                return new DeserializingMessage(deserializingObject, this.messageType);
            }).iterator();
            it.forEachRemaining(deserializingMessage -> {
                try {
                    DeserializingMessage.setCurrent(deserializingMessage);
                    list2.forEach(handler -> {
                        tryHandle(deserializingMessage, handler, consumerConfiguration);
                    });
                    if (!it.hasNext()) {
                        list2.forEach((v0) -> {
                            v0.onEndOfBatch();
                        });
                    }
                } finally {
                    DeserializingMessage.removeCurrent();
                }
            });
        };
    }

    protected void tryHandle(DeserializingMessage deserializingMessage, Handler<DeserializingMessage> handler, ConsumerConfiguration consumerConfiguration) {
        try {
            if (handler.canHandle(deserializingMessage)) {
                handle(deserializingMessage, handler, consumerConfiguration);
            }
        } catch (Exception e) {
            consumerConfiguration.getErrorHandler().handleError(e, String.format("Handler %s failed to handle a %s", handler, deserializingMessage), () -> {
                handle(deserializingMessage, handler, consumerConfiguration);
            });
        }
    }

    protected void handle(DeserializingMessage deserializingMessage, Handler<DeserializingMessage> handler, ConsumerConfiguration consumerConfiguration) {
        Object technicalException;
        Exception exc = null;
        try {
            technicalException = this.handlerInterceptor.interceptHandling(deserializingMessage2 -> {
                return handler.invoke(deserializingMessage);
            }, handler, consumerConfiguration.getName()).apply(deserializingMessage);
        } catch (FunctionalException e) {
            technicalException = e;
            exc = e;
        } catch (Exception e2) {
            technicalException = new TechnicalException(String.format("Handler %s failed to handle a %s", handler, deserializingMessage));
            exc = e2;
        }
        SerializedMessage serializedObject = deserializingMessage.getSerializedObject();
        boolean shouldSendResponse = shouldSendResponse(handler, deserializingMessage);
        if (technicalException instanceof CompletionStage) {
            ((CompletionStage) technicalException).whenComplete((obj, th) -> {
                Object obj = obj;
                if (th != null && !(th instanceof FunctionalException)) {
                    obj = new TechnicalException(String.format("Handler %s failed to handle a %s", handler, deserializingMessage));
                }
                try {
                    DeserializingMessage.setCurrent(deserializingMessage);
                    if (shouldSendResponse) {
                        this.resultGateway.respond(obj, serializedObject.getSource(), serializedObject.getRequestId().intValue());
                    }
                    if (th != null) {
                        consumerConfiguration.getErrorHandler().handleError((Exception) th, String.format("Handler %s failed to handle a %s", handler, deserializingMessage), () -> {
                            handle(deserializingMessage, handler, consumerConfiguration);
                        });
                    }
                } catch (Exception e3) {
                    log.warn("Did not stop consumer {} after async handler {} failed to handle a {}", new Object[]{consumerConfiguration.getName(), handler, deserializingMessage, e3});
                } finally {
                    DeserializingMessage.removeCurrent();
                }
            });
        } else if (shouldSendResponse) {
            this.resultGateway.respond(technicalException, serializedObject.getSource(), serializedObject.getRequestId().intValue());
        }
        if (exc != null) {
            throw exc;
        }
    }

    private boolean shouldSendResponse(Handler<DeserializingMessage> handler, DeserializingMessage deserializingMessage) {
        if (deserializingMessage.getSerializedObject().getRequestId() == null) {
            return false;
        }
        Annotation annotation = handler.getMethod(deserializingMessage).getAnnotation(this.handlerAnnotation);
        Optional findFirst = Arrays.stream(this.handlerAnnotation.getMethods()).filter(method -> {
            return method.getName().equals("passive");
        }).findFirst();
        if (findFirst.isPresent()) {
            return !((Boolean) ((Method) findFirst.get()).invoke(annotation, new Object[0])).booleanValue();
        }
        return true;
    }

    @Override // io.fluxcapacitor.javaclient.tracking.Tracking, java.lang.AutoCloseable
    public void close() {
        synchronized (this.$lock) {
            this.shutdownFunction.get().cancel();
        }
    }

    @ConstructorProperties({"messageType", "handlerAnnotation", "trackingClient", "resultGateway", "configurations", "serializer", "handlerInterceptor", "parameterResolvers"})
    public DefaultTracking(MessageType messageType, Class<? extends Annotation> cls, TrackingClient trackingClient, ResultGateway resultGateway, List<ConsumerConfiguration> list, Serializer serializer, HandlerInterceptor handlerInterceptor, List<ParameterResolver<? super DeserializingMessage>> list2) {
        this.messageType = messageType;
        this.handlerAnnotation = cls;
        this.trackingClient = trackingClient;
        this.resultGateway = resultGateway;
        this.configurations = list;
        this.serializer = serializer;
        this.handlerInterceptor = handlerInterceptor;
        this.parameterResolvers = list2;
    }
}
