package fr.maif.eventsourcing;

import akka.actor.ActorSystem;
import akka.kafka.ProducerSettings;
import fr.maif.eventsourcing.Command;
import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.ReactivePostgresKafkaEventProcessorBuilder;
import fr.maif.eventsourcing.State;
import fr.maif.eventsourcing.format.JacksonEventFormat;
import fr.maif.eventsourcing.format.JacksonSimpleFormat;
import fr.maif.eventsourcing.impl.DefaultAggregateStore;
import fr.maif.eventsourcing.impl.KafkaEventPublisher;
import fr.maif.jooq.PgAsyncPool;
import fr.maif.jooq.PgAsyncTransaction;
import io.vavr.collection.List;
import java.io.Closeable;
import java.io.IOException;

/* loaded from: input_file:fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessor.class */
public class ReactivePostgresKafkaEventProcessor<Error, S extends State<S>, C extends Command<Meta, Context>, E extends Event, Message, Meta, Context> extends EventProcessor<Error, S, C, E, PgAsyncTransaction, Message, Meta, Context> implements Closeable {
    private final PostgresKafkaEventProcessorConfig<Error, S, C, E, Message, Meta, Context> config;

    /* loaded from: input_file:fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessor$PostgresKafkaEventProcessorConfig.class */
    public static class PostgresKafkaEventProcessorConfig<Error, S extends State<S>, C extends Command<Meta, Context>, E extends Event, Message, Meta, Context> {
        public final ReactivePostgresEventStore<E, Meta, Context> eventStore;
        public final TransactionManager<PgAsyncTransaction> transactionManager;
        public final AggregateStore<S, String, PgAsyncTransaction> aggregateStore;
        public final CommandHandler<Error, S, C, E, Message, PgAsyncTransaction> commandHandler;
        public final EventHandler<S, E> eventHandler;
        public final List<Projection<PgAsyncTransaction, E, Meta, Context>> projections;
        public final KafkaEventPublisher<E, Meta, Context> eventPublisher;

        public PostgresKafkaEventProcessorConfig(ActorSystem actorSystem, TableNames tableNames, PgAsyncPool pgAsyncPool, String str, ProducerSettings<String, EventEnvelope<E, Meta, Context>> producerSettings, TransactionManager<PgAsyncTransaction> transactionManager, AggregateStore<S, String, PgAsyncTransaction> aggregateStore, CommandHandler<Error, S, C, E, Message, PgAsyncTransaction> commandHandler, EventHandler<S, E> eventHandler, List<Projection<PgAsyncTransaction, E, Meta, Context>> list, JacksonEventFormat<?, E> jacksonEventFormat, JacksonSimpleFormat<Meta> jacksonSimpleFormat, JacksonSimpleFormat<Context> jacksonSimpleFormat2, Integer num) {
            this.transactionManager = transactionManager;
            this.eventPublisher = new KafkaEventPublisher<>(actorSystem, producerSettings, str, num);
            this.eventStore = new ReactivePostgresEventStore<>(actorSystem, this.eventPublisher, pgAsyncPool, tableNames, jacksonEventFormat, jacksonSimpleFormat, jacksonSimpleFormat2);
            this.aggregateStore = aggregateStore == null ? new DefaultAggregateStore<>(this.eventStore, eventHandler, actorSystem, transactionManager) : aggregateStore;
            this.commandHandler = commandHandler;
            this.eventHandler = eventHandler;
            this.projections = list;
        }

        public PostgresKafkaEventProcessorConfig(ActorSystem actorSystem, TableNames tableNames, PgAsyncPool pgAsyncPool, String str, ProducerSettings<String, EventEnvelope<E, Meta, Context>> producerSettings, TransactionManager<PgAsyncTransaction> transactionManager, AggregateStore<S, String, PgAsyncTransaction> aggregateStore, CommandHandler<Error, S, C, E, Message, PgAsyncTransaction> commandHandler, EventHandler<S, E> eventHandler, List<Projection<PgAsyncTransaction, E, Meta, Context>> list, JacksonEventFormat<?, E> jacksonEventFormat, JacksonSimpleFormat<Meta> jacksonSimpleFormat, JacksonSimpleFormat<Context> jacksonSimpleFormat2) {
            this(actorSystem, tableNames, pgAsyncPool, str, producerSettings, transactionManager, aggregateStore, commandHandler, eventHandler, list, jacksonEventFormat, jacksonSimpleFormat, jacksonSimpleFormat2, null);
        }

        public PostgresKafkaEventProcessorConfig(ActorSystem actorSystem, TableNames tableNames, PgAsyncPool pgAsyncPool, String str, ProducerSettings<String, EventEnvelope<E, Meta, Context>> producerSettings, TransactionManager<PgAsyncTransaction> transactionManager, CommandHandler<Error, S, C, E, Message, PgAsyncTransaction> commandHandler, EventHandler<S, E> eventHandler, List<Projection<PgAsyncTransaction, E, Meta, Context>> list, JacksonEventFormat<?, E> jacksonEventFormat, JacksonSimpleFormat<Meta> jacksonSimpleFormat, JacksonSimpleFormat<Context> jacksonSimpleFormat2, Integer num) {
            this(actorSystem, tableNames, pgAsyncPool, str, producerSettings, transactionManager, null, commandHandler, eventHandler, list, jacksonEventFormat, jacksonSimpleFormat, jacksonSimpleFormat2, num);
        }

        public PostgresKafkaEventProcessorConfig(ReactivePostgresEventStore<E, Meta, Context> reactivePostgresEventStore, TransactionManager<PgAsyncTransaction> transactionManager, AggregateStore<S, String, PgAsyncTransaction> aggregateStore, CommandHandler<Error, S, C, E, Message, PgAsyncTransaction> commandHandler, EventHandler<S, E> eventHandler, List<Projection<PgAsyncTransaction, E, Meta, Context>> list, KafkaEventPublisher<E, Meta, Context> kafkaEventPublisher) {
            this.eventStore = reactivePostgresEventStore;
            this.transactionManager = transactionManager;
            this.aggregateStore = aggregateStore;
            this.commandHandler = commandHandler;
            this.eventHandler = eventHandler;
            this.projections = list;
            this.eventPublisher = kafkaEventPublisher;
        }
    }

    public static ReactivePostgresKafkaEventProcessorBuilder.BuilderWithSystem newSystem() {
        return new ReactivePostgresKafkaEventProcessorBuilder.BuilderWithSystem(ActorSystem.create());
    }

    public static ReactivePostgresKafkaEventProcessorBuilder.BuilderWithSystem withSystem(ActorSystem actorSystem) {
        return new ReactivePostgresKafkaEventProcessorBuilder.BuilderWithSystem(actorSystem);
    }

    public ReactivePostgresKafkaEventProcessor(PostgresKafkaEventProcessorConfig<Error, S, C, E, Message, Meta, Context> postgresKafkaEventProcessorConfig) {
        super(postgresKafkaEventProcessorConfig.eventStore, postgresKafkaEventProcessorConfig.transactionManager, postgresKafkaEventProcessorConfig.aggregateStore, postgresKafkaEventProcessorConfig.commandHandler, postgresKafkaEventProcessorConfig.eventHandler, postgresKafkaEventProcessorConfig.projections);
        this.config = postgresKafkaEventProcessorConfig;
        postgresKafkaEventProcessorConfig.eventPublisher.start(postgresKafkaEventProcessorConfig.eventStore);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.config.eventPublisher.close();
    }
}
