/*
 * Decompiled with CFR 0.152.
 */
package fr.maif.eventsourcing;

import com.fasterxml.uuid.Generators;
import com.fasterxml.uuid.impl.TimeBasedGenerator;
import fr.maif.concurrent.CompletionStages;
import fr.maif.eventsourcing.AggregateStore;
import fr.maif.eventsourcing.Command;
import fr.maif.eventsourcing.CommandHandler;
import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventHandler;
import fr.maif.eventsourcing.EventProcessor;
import fr.maif.eventsourcing.EventStore;
import fr.maif.eventsourcing.Events;
import fr.maif.eventsourcing.ProcessingSuccess;
import fr.maif.eventsourcing.Projection;
import fr.maif.eventsourcing.State;
import fr.maif.eventsourcing.TransactionManager;
import io.vavr.API;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.Tuple2;
import io.vavr.Tuple3;
import io.vavr.Value;
import io.vavr.collection.List;
import io.vavr.collection.Traversable;
import io.vavr.control.Either;
import io.vavr.control.Option;
import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventProcessorImpl<Error, S extends State<S>, C extends Command<Meta, Context>, E extends Event, TxCtx, Message, Meta, Context>
implements EventProcessor<Error, S, C, E, TxCtx, Message, Meta, Context> {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessorImpl.class);
    private static final TimeBasedGenerator UUIDgen = Generators.timeBasedGenerator();
    private final EventStore<TxCtx, E, Meta, Context> eventStore;
    private final TransactionManager<TxCtx> transactionManager;
    private final AggregateStore<S, String, TxCtx> aggregateStore;
    private final CommandHandler<Error, S, C, E, Message, TxCtx> commandHandler;
    private final EventHandler<S, E> eventHandler;
    private final List<Projection<TxCtx, E, Meta, Context>> projections;

    public EventProcessorImpl(EventStore<TxCtx, E, Meta, Context> eventStore, TransactionManager<TxCtx> transactionManager, AggregateStore<S, String, TxCtx> aggregateStore, CommandHandler<Error, S, C, E, Message, TxCtx> commandHandler, EventHandler<S, E> eventHandler, List<Projection<TxCtx, E, Meta, Context>> projections) {
        this.eventStore = eventStore;
        this.transactionManager = transactionManager;
        this.aggregateStore = aggregateStore;
        this.commandHandler = commandHandler;
        this.eventHandler = eventHandler;
        this.projections = projections;
    }

    @Override
    public CompletionStage<Either<Error, ProcessingSuccess<S, E, Meta, Context, Message>>> processCommand(C command) {
        LOGGER.debug("Processing command {}", command);
        return this.batchProcessCommand(API.List(command)).thenApply(Traversable::head);
    }

    @Override
    public CompletionStage<List<Either<Error, ProcessingSuccess<S, E, Meta, Context, Message>>>> batchProcessCommand(List<C> commands) {
        LOGGER.debug("Processing commands {}", commands);
        return this.transactionManager.withTransaction(ctx -> this.batchProcessCommand(ctx, commands)).thenCompose(listInTransactionResult -> listInTransactionResult.postTransaction());
    }

    @Override
    public CompletionStage<TransactionManager.InTransactionResult<List<Either<Error, ProcessingSuccess<S, E, Meta, Context, Message>>>>> batchProcessCommand(TxCtx ctx, List<C> commands) {
        return this.traverseCommands(commands, (c, events) -> this.getCurrentState(ctx, (C)c, (List<E>)events).thenCompose(mayBeState -> this.handleCommand(ctx, (Option<S>)mayBeState, (C)c).thenApply(r -> API.Tuple((Object)c, (Object)mayBeState, (Object)r)))).thenCompose(commandsAndResults -> {
            List errors = commandsAndResults.map(Tuple3::_3).filter(Either::isLeft).map(e -> Either.left((Object)e.swap().get()));
            CompletionStage<List<Tuple2>> success = CompletionStages.traverse(commandsAndResults.filter(t -> ((Either)t._3).isRight()), t -> {
                Command command = (Command)t._1;
                Option mayBeState = (Option)t._2;
                List events = ((Events)((Either)t._3).get()).events.toList();
                return this.buildEnvelopes(ctx, command, events).thenApply(eventEnvelopes -> {
                    Option mayBeLastSeqNum = eventEnvelopes.lastOption().map(evl -> evl.sequenceNum);
                    return new CommandStateAndEvent(this, command, mayBeState, eventEnvelopes, events, ((Events)((Either)t._3).get()).message, mayBeLastSeqNum);
                });
            });
            return success.thenApply(s -> API.Tuple((Object)s.toList(), (Object)errors));
        }).thenCompose(successAndErrors -> {
            List errors = (List)successAndErrors._2;
            List success = (List)successAndErrors._1;
            List envelopes = success.flatMap(CommandStateAndEvent::getEventEnvelopes);
            CompletionStage<List> stored = this.eventStore.persist(ctx, envelopes).thenCompose(__ -> CompletionStages.traverse(success, s -> {
                LOGGER.debug("Storing state {} to DB", s);
                List sequences = envelopes.filter(env -> env.entityId.equals(s.command.entityId().get())).map(env -> env.sequenceNum);
                return this.aggregateStore.buildAggregateAndStoreSnapshot(ctx, this.eventHandler, s.getState(), (String)s.getCommand().entityId().get(), s.getEvents(), (Option<Long>)sequences.max()).thenApply(mayBeNextState -> new ProcessingSuccess(s.state, mayBeNextState, s.getEventEnvelopes(), s.getMessage()));
            })).thenCompose(mayBeNextState -> CompletionStages.traverse(this.projections, p -> {
                LOGGER.debug("Applying envelopes {} to projection", (Object)envelopes);
                return p.storeProjection(ctx, envelopes);
            }).thenApply(__ -> mayBeNextState));
            return stored.thenApply(results -> errors.appendAll((Iterable)results.map(Either::right)));
        }).thenApply(results -> {
            Supplier<CompletionStage<Tuple0>> postTransactionProcess = () -> {
                List envelopes = results.flatMap(Value::toList).flatMap(ProcessingSuccess::getEvents);
                LOGGER.debug("Publishing events {} to kafka", (Object)envelopes);
                return this.eventStore.publish(envelopes).thenApply(__ -> Tuple.empty()).exceptionally(e -> Tuple.empty());
            };
            TransactionManager.InTransactionResult<List> inTransactionResult = new TransactionManager.InTransactionResult<List>((List)results, postTransactionProcess);
            return inTransactionResult;
        });
    }

    public CompletionStage<List<Tuple3<C, Option<S>, Either<Error, Events<E, Message>>>>> traverseCommands(List<C> elements, BiFunction<C, List<E>, CompletionStage<Tuple3<C, Option<S>, Either<Error, Events<E, Message>>>>> handler) {
        return ((CompletionStage)elements.foldLeft(CompletableFuture.completedStage(API.Tuple((Object)List.empty(), (Object)List.empty())), (fResult, elt) -> fResult.thenCompose(listResult -> ((CompletionStage)handler.apply(elt, ((List)listResult._2).flatMap(e -> e.events))).thenApply(r -> API.Tuple((Object)((List)listResult._1).append(r), (Object)((List)listResult._2).append((Object)((Events)((Either)r._3).getOrElse(Events.empty())))))))).thenApply(t -> (List)t._1);
    }

    CompletionStage<List<EventEnvelope<E, Meta, Context>>> buildEnvelopes(TxCtx tx, C command, List<E> events) {
        String transactionId = this.transactionManager.transactionId();
        int nbMessages = events.length();
        return CompletionStages.traverse(events.zipWithIndex(), t -> this.buildEnvelope(tx, (Command<Meta, Context>)command, (E)((Event)t._1), (Integer)t._2, nbMessages, transactionId)).thenApply(Value::toList);
    }

    private CompletionStage<Either<Error, Events<E, Message>>> handleCommand(TxCtx txCtx, Option<S> state, C command) {
        return this.commandHandler.handleCommand(txCtx, state, command);
    }

    private CompletionStage<Option<S>> getCurrentState(TxCtx ctx, C command, List<E> previousEvent) {
        if (command.hasId().booleanValue()) {
            String entityId = (String)command.entityId().get();
            return this.aggregateStore.getAggregate(ctx, entityId).thenApply(state -> this.eventHandler.deriveState((Option<S>)state, (List<E>)previousEvent.filter(e -> e.entityId().equals(entityId))));
        }
        return CompletionStages.successful(API.None());
    }

    private CompletionStage<EventEnvelope<E, Meta, Context>> buildEnvelope(TxCtx tx, Command<Meta, Context> command, E event, Integer numMessage, Integer nbMessages, String transactionId) {
        LOGGER.debug("Writing event {} to envelope", event);
        UUID id = UUIDgen.generate();
        return this.eventStore.nextSequence(tx).thenApply(nextSequence -> {
            EventEnvelope.Builder builder = EventEnvelope.builder().withId(id).withEmissionDate(LocalDateTime.now()).withEntityId(event.entityId()).withSequenceNum(nextSequence).withEventType(event.type().name()).withVersion(event.type().version()).withTotalMessageInTransaction(nbMessages).withNumMessageInTransaction(Integer.valueOf(numMessage + 1)).withTransactionId(transactionId).withEvent(event);
            command.context().forEach(arg_0 -> ((EventEnvelope.Builder)builder).withContext(arg_0));
            command.userId().forEach(arg_0 -> ((EventEnvelope.Builder)builder).withUserId(arg_0));
            command.systemId().forEach(arg_0 -> ((EventEnvelope.Builder)builder).withSystemId(arg_0));
            command.metadata().forEach(arg_0 -> ((EventEnvelope.Builder)builder).withMetadata(arg_0));
            return builder.build();
        });
    }

    @Override
    public CompletionStage<Option<S>> getAggregate(String id) {
        return this.transactionManager.withTransaction(t -> this.getAggregateStore().getAggregate(t, id));
    }

    @Override
    public EventStore<TxCtx, E, Meta, Context> eventStore() {
        return this.eventStore;
    }

    @Override
    public AggregateStore<S, String, TxCtx> getAggregateStore() {
        return this.aggregateStore;
    }

    private static class CommandStateAndEvent {
        public final C command;
        public final Option<S> state;
        public final List<EventEnvelope<E, Meta, Context>> eventEnvelopes;
        public final List<E> events;
        public final Message message;
        public final Option<Long> sequenceNum;
        final /* synthetic */ EventProcessorImpl this$0;

        public CommandStateAndEvent(C command, Option<S> state, List<EventEnvelope<E, Meta, Context>> eventEnvelopes, List<E> events, Message message, Option<Long> sequenceNum) {
            this.this$0 = var1_1;
            this.command = command;
            this.state = state;
            this.eventEnvelopes = eventEnvelopes;
            this.events = events;
            this.message = message;
            this.sequenceNum = sequenceNum;
        }

        public C getCommand() {
            return this.command;
        }

        public Option<S> getState() {
            return this.state;
        }

        public List<EventEnvelope<E, Meta, Context>> getEventEnvelopes() {
            return this.eventEnvelopes;
        }

        public List<E> getEvents() {
            return this.events;
        }

        public Message getMessage() {
            return this.message;
        }

        public Option<Long> getSequenceNum() {
            return this.sequenceNum;
        }
    }
}

