package fr.maif.eventsourcing;

import com.fasterxml.uuid.Generators;
import com.fasterxml.uuid.impl.TimeBasedGenerator;
import fr.maif.concurrent.CompletionStages;
import fr.maif.eventsourcing.Command;
import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.State;
import fr.maif.eventsourcing.TransactionManager;
import io.vavr.API;
import io.vavr.Tuple;
import io.vavr.collection.List;
import io.vavr.control.Either;
import io.vavr.control.Option;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:fr/maif/eventsourcing/EventProcessorImpl.class */
public class EventProcessorImpl<Error, S extends State<S>, C extends Command<Meta, Context>, E extends Event, TxCtx, Message, Meta, Context> implements EventProcessor<Error, S, C, E, TxCtx, Message, Meta, Context> {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessorImpl.class);
    private static final TimeBasedGenerator UUIDgen = Generators.timeBasedGenerator();
    private final EventStore<TxCtx, E, Meta, Context> eventStore;
    private final TransactionManager<TxCtx> transactionManager;
    private final AggregateStore<S, String, TxCtx> aggregateStore;
    private final CommandHandler<Error, S, C, E, Message, TxCtx> commandHandler;
    private final EventHandler<S, E> eventHandler;
    private final List<Projection<TxCtx, E, Meta, Context>> projections;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:fr/maif/eventsourcing/EventProcessorImpl$CommandStateAndEvent.class */
    public class CommandStateAndEvent {
        public final C command;
        public final Option<S> state;
        public final List<EventEnvelope<E, Meta, Context>> eventEnvelopes;
        public final List<E> events;
        public final Message message;
        public final Option<Long> sequenceNum;

        public CommandStateAndEvent(C c, Option<S> option, List<EventEnvelope<E, Meta, Context>> list, List<E> list2, Message message, Option<Long> option2) {
            this.command = c;
            this.state = option;
            this.eventEnvelopes = list;
            this.events = list2;
            this.message = message;
            this.sequenceNum = option2;
        }

        public C getCommand() {
            return this.command;
        }

        public Option<S> getState() {
            return this.state;
        }

        public List<EventEnvelope<E, Meta, Context>> getEventEnvelopes() {
            return this.eventEnvelopes;
        }

        public List<E> getEvents() {
            return this.events;
        }

        public Message getMessage() {
            return this.message;
        }

        public Option<Long> getSequenceNum() {
            return this.sequenceNum;
        }
    }

    public EventProcessorImpl(EventStore<TxCtx, E, Meta, Context> eventStore, TransactionManager<TxCtx> transactionManager, AggregateStore<S, String, TxCtx> aggregateStore, CommandHandler<Error, S, C, E, Message, TxCtx> commandHandler, EventHandler<S, E> eventHandler, List<Projection<TxCtx, E, Meta, Context>> list) {
        this.eventStore = eventStore;
        this.transactionManager = transactionManager;
        this.aggregateStore = aggregateStore;
        this.commandHandler = commandHandler;
        this.eventHandler = eventHandler;
        this.projections = list;
    }

    @Override // fr.maif.eventsourcing.EventProcessor
    public CompletionStage<Either<Error, ProcessingSuccess<S, E, Meta, Context, Message>>> processCommand(C c) {
        LOGGER.debug("Processing command {}", c);
        return (CompletionStage<Either<Error, ProcessingSuccess<S, E, Meta, Context, Message>>>) batchProcessCommand(API.List(c)).thenApply((v0) -> {
            return v0.head();
        });
    }

    @Override // fr.maif.eventsourcing.EventProcessor
    public CompletionStage<List<Either<Error, ProcessingSuccess<S, E, Meta, Context, Message>>>> batchProcessCommand(List<C> list) {
        LOGGER.debug("Processing commands {}", list);
        return this.transactionManager.withTransaction(obj -> {
            return batchProcessCommand(obj, list);
        }).thenCompose((v0) -> {
            return v0.postTransaction();
        });
    }

    @Override // fr.maif.eventsourcing.EventProcessor
    public CompletionStage<TransactionManager.InTransactionResult<List<Either<Error, ProcessingSuccess<S, E, Meta, Context, Message>>>>> batchProcessCommand(TxCtx txctx, List<C> list) {
        return CompletionStages.traverse(list, command -> {
            return getSnapshot(txctx, command).thenCompose(option -> {
                return handleCommand(txctx, option, command).thenApply(either -> {
                    return API.Tuple(command, option, either);
                });
            });
        }).thenApply((v0) -> {
            return v0.toList();
        }).thenCompose(list2 -> {
            List map = list2.map((v0) -> {
                return v0._3();
            }).filter((v0) -> {
                return v0.isLeft();
            }).map(either -> {
                return Either.left(either.swap().get());
            });
            return CompletionStages.traverse(list2.filter(tuple3 -> {
                return ((Either) tuple3._3).isRight();
            }), tuple32 -> {
                Command command2 = (Command) tuple32._1;
                Option option = (Option) tuple32._2;
                List list2 = ((Events) ((Either) tuple32._3).get()).events.toList();
                return buildEnvelopes(txctx, command2, list2).thenApply(list3 -> {
                    return new CommandStateAndEvent(command2, option, list3, list2, ((Events) ((Either) tuple32._3).get()).message, list3.lastOption().map(eventEnvelope -> {
                        return eventEnvelope.sequenceNum;
                    }));
                });
            }).thenApply(list2 -> {
                return API.Tuple(list2.toList(), map);
            });
        }).thenCompose(tuple2 -> {
            List list3 = (List) tuple2._2;
            List list4 = (List) tuple2._1;
            List<EventEnvelope<E, Meta, Context>> flatMap = list4.flatMap((v0) -> {
                return v0.getEventEnvelopes();
            });
            return this.eventStore.persist(txctx, flatMap).thenCompose(r7 -> {
                return CompletionStages.traverse(list4, commandStateAndEvent -> {
                    LOGGER.debug("Storing state {} to DB", commandStateAndEvent);
                    return this.aggregateStore.buildAggregateAndStoreSnapshot(txctx, this.eventHandler, commandStateAndEvent.getState(), (String) commandStateAndEvent.getCommand().entityId().get(), commandStateAndEvent.getEvents(), commandStateAndEvent.getSequenceNum()).thenApply(option -> {
                        return new ProcessingSuccess(commandStateAndEvent.state, option, commandStateAndEvent.getEventEnvelopes(), commandStateAndEvent.getMessage());
                    });
                });
            }).thenCompose(list5 -> {
                return CompletionStages.traverse(this.projections, projection -> {
                    LOGGER.debug("Applying envelopes {} to projection", flatMap);
                    return projection.storeProjection(txctx, flatMap);
                }).thenApply(list5 -> {
                    return list5;
                });
            }).thenApply(seq -> {
                return list3.appendAll(seq.map((v0) -> {
                    return Either.right(v0);
                }));
            });
        }).thenApply(list3 -> {
            return new TransactionManager.InTransactionResult(list3, () -> {
                List<EventEnvelope<E, Meta, Context>> flatMap = list3.flatMap((v0) -> {
                    return v0.toList();
                }).flatMap((v0) -> {
                    return v0.getEvents();
                });
                LOGGER.debug("Publishing events {} to kafka", flatMap);
                return this.eventStore.publish(flatMap).thenApply(r2 -> {
                    return Tuple.empty();
                }).exceptionally(th -> {
                    return Tuple.empty();
                });
            });
        });
    }

    CompletionStage<List<EventEnvelope<E, Meta, Context>>> buildEnvelopes(TxCtx txctx, C c, List<E> list) {
        String transactionId = this.transactionManager.transactionId();
        int length = list.length();
        return CompletionStages.traverse(list.zipWithIndex(), tuple2 -> {
            return buildEnvelope(txctx, c, (Event) tuple2._1, (Integer) tuple2._2, Integer.valueOf(length), transactionId);
        }).thenApply((v0) -> {
            return v0.toList();
        });
    }

    private CompletionStage<Either<Error, Events<E, Message>>> handleCommand(TxCtx txctx, Option<S> option, C c) {
        return this.commandHandler.handleCommand(txctx, option, c);
    }

    private CompletionStage<Option<S>> getSnapshot(TxCtx txctx, C c) {
        return c.hasId().booleanValue() ? this.aggregateStore.getAggregate(txctx, (String) c.entityId().get()) : CompletionStages.successful(API.None());
    }

    private CompletionStage<EventEnvelope<E, Meta, Context>> buildEnvelope(TxCtx txctx, Command<Meta, Context> command, E e, Integer num, Integer num2, String str) {
        LOGGER.debug("Writing event {} to envelope", e);
        UUID generate = UUIDgen.generate();
        return (CompletionStage<EventEnvelope<E, Meta, Context>>) this.eventStore.nextSequence(txctx).thenApply(l -> {
            EventEnvelope.Builder withEvent = EventEnvelope.builder().withId(generate).withEmissionDate(LocalDateTime.now()).withEntityId(e.entityId()).withSequenceNum(l).withEventType(e.type().name()).withVersion(e.type().version()).withTotalMessageInTransaction(num2).withNumMessageInTransaction(Integer.valueOf(num.intValue() + 1)).withTransactionId(str).withEvent(e);
            Option<Context> context = command.context();
            Objects.requireNonNull(withEvent);
            context.forEach(withEvent::withContext);
            Option<String> userId = command.userId();
            Objects.requireNonNull(withEvent);
            userId.forEach(withEvent::withUserId);
            Option<String> systemId = command.systemId();
            Objects.requireNonNull(withEvent);
            systemId.forEach(withEvent::withSystemId);
            Option<Meta> metadata = command.metadata();
            Objects.requireNonNull(withEvent);
            metadata.forEach(withEvent::withMetadata);
            return withEvent.build();
        });
    }

    @Override // fr.maif.eventsourcing.EventProcessor
    public CompletionStage<Option<S>> getAggregate(String str) {
        return (CompletionStage<Option<S>>) this.transactionManager.withTransaction(obj -> {
            return getAggregateStore().getAggregate(obj, str);
        });
    }

    @Override // fr.maif.eventsourcing.EventProcessor
    public EventStore<TxCtx, E, Meta, Context> eventStore() {
        return this.eventStore;
    }

    @Override // fr.maif.eventsourcing.EventProcessor
    public AggregateStore<S, String, TxCtx> getAggregateStore() {
        return this.aggregateStore;
    }
}
