package com.example.demo;

import com.example.demo.BankCommand;
import com.fasterxml.uuid.Generators;
import com.fasterxml.uuid.impl.TimeBasedGenerator;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.ProcessingSuccess;
import fr.maif.eventsourcing.ProjectionGetter;
import fr.maif.eventsourcing.ReactiveEventProcessor;
import fr.maif.eventsourcing.ReactorEventProcessor;
import fr.maif.eventsourcing.TableNames;
import fr.maif.jooq.reactor.PgAsyncPool;
import fr.maif.jooq.reactor.PgAsyncTransaction;
import fr.maif.kafka.JsonFormatSerDer;
import fr.maif.reactor.kafka.KafkaSettings;
import io.vavr.API;
import io.vavr.Lazy;
import io.vavr.Tuple0;
import io.vavr.control.Either;
import io.vavr.control.Option;
import io.vertx.core.Vertx;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.PoolOptions;
import java.io.Closeable;
import java.io.IOException;
import java.math.BigDecimal;
import org.jooq.SQLDialect;
import org.jooq.impl.DefaultConfiguration;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.SenderOptions;

/* loaded from: input_file:com/example/demo/Bank.class */
public class Bank implements Closeable {
    private static final TimeBasedGenerator UUIDgenerator = Generators.timeBasedGenerator();
    private PgPool pgPool;
    private final ReactorEventProcessor<String, Account, BankCommand, BankEvent, PgAsyncTransaction, Tuple0, Tuple0, Tuple0> eventProcessor;
    private final String accountTable = "CREATE TABLE IF NOT EXISTS ACCOUNTS (\n    id varchar(100) PRIMARY KEY,\n    balance money NOT NULL\n);";
    private final String bankJournalTable = "\nCREATE TABLE IF NOT EXISTS bank_journal (\n  id UUID primary key,\n  entity_id varchar(100) not null,\n  sequence_num bigint not null,\n  event_type varchar(100) not null,\n  version int not null,\n  transaction_id varchar(100) not null,\n  event jsonb not null,\n  metadata jsonb,\n  context jsonb,\n  total_message_in_transaction int default 1,\n  num_message_in_transaction int default 1,\n  emission_date timestamp not null default now(),\n  user_id varchar(100),\n  system_id varchar(100),\n  published boolean default false,\n  UNIQUE (entity_id, sequence_num)\n);";
    private final String SEQUENCE = "        CREATE SEQUENCE if not exists bank_sequence_num;\n";
    private final Vertx vertx = Vertx.vertx();
    private final PgAsyncPool pgAsyncPool = pgAsyncPool(this.vertx);
    private final WithdrawByMonthProjection withdrawByMonthProjection = new WithdrawByMonthProjection(this.pgAsyncPool);

    public Bank(BankCommandHandler bankCommandHandler, BankEventHandler bankEventHandler) {
        this.eventProcessor = ReactiveEventProcessor.withPgAsyncPool(this.pgAsyncPool).withTables(tableNames()).withTransactionManager().withEventFormater(BankEventFormat.bankEventFormat.jacksonEventFormat()).withNoMetaFormater().withNoContextFormater().withKafkaSettings("bank", senderOptions(settings())).withEventHandler(bankEventHandler).withDefaultAggregateStore().withCommandHandler(bankCommandHandler).withProjections(new ProjectionGetter[]{this.withdrawByMonthProjection}).build();
    }

    public Mono<Void> init() {
        API.println("Initializing database");
        return Flux.fromIterable(API.List(new String[]{"CREATE TABLE IF NOT EXISTS ACCOUNTS (\n    id varchar(100) PRIMARY KEY,\n    balance money NOT NULL\n);", "\nCREATE TABLE IF NOT EXISTS bank_journal (\n  id UUID primary key,\n  entity_id varchar(100) not null,\n  sequence_num bigint not null,\n  event_type varchar(100) not null,\n  version int not null,\n  transaction_id varchar(100) not null,\n  event jsonb not null,\n  metadata jsonb,\n  context jsonb,\n  total_message_in_transaction int default 1,\n  num_message_in_transaction int default 1,\n  emission_date timestamp not null default now(),\n  user_id varchar(100),\n  system_id varchar(100),\n  published boolean default false,\n  UNIQUE (entity_id, sequence_num)\n);", "        CREATE SEQUENCE if not exists bank_sequence_num;\n"})).concatMap(str -> {
            return this.pgAsyncPool.executeMono(dSLContext -> {
                return dSLContext.query(str);
            });
        }).collectList().doOnSuccess(list -> {
            API.println("Database initialized");
        }).doOnError(th -> {
            API.println("Database initialization failed");
            th.printStackTrace();
        }).flatMap(list2 -> {
            return this.withdrawByMonthProjection.init();
        }).then();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.pgPool.close();
        this.vertx.close();
    }

    private PgAsyncPool pgAsyncPool(Vertx vertx) {
        DefaultConfiguration defaultConfiguration = new DefaultConfiguration();
        defaultConfiguration.setSQLDialect(SQLDialect.POSTGRES);
        this.pgPool = PgPool.pool(vertx, new PgConnectOptions().setPort(5432).setHost("localhost").setDatabase("eventsourcing").setUser("eventsourcing").setPassword("eventsourcing"), new PoolOptions().setMaxSize(50));
        return PgAsyncPool.create(this.pgPool, defaultConfiguration);
    }

    private KafkaSettings settings() {
        return KafkaSettings.newBuilder("localhost:29092").build();
    }

    private SenderOptions<String, EventEnvelope<BankEvent, Tuple0, Tuple0>> senderOptions(KafkaSettings kafkaSettings) {
        return kafkaSettings.producerSettings(JsonFormatSerDer.of(BankEventFormat.bankEventFormat));
    }

    private TableNames tableNames() {
        return new TableNames("bank_journal", "bank_sequence_num");
    }

    public Mono<Either<String, Account>> createAccount(BigDecimal bigDecimal) {
        return this.eventProcessor.processCommand(new BankCommand.OpenAccount(Lazy.of(() -> {
            return UUIDgenerator.generate().toString();
        }), bigDecimal)).map(either -> {
            return either.flatMap(processingSuccess -> {
                return processingSuccess.currentState.toEither("Current state is missing");
            });
        });
    }

    public Mono<Either<String, Account>> withdraw(String str, BigDecimal bigDecimal) {
        return this.eventProcessor.processCommand(new BankCommand.Withdraw(str, bigDecimal)).map(either -> {
            return either.flatMap(processingSuccess -> {
                return processingSuccess.currentState.toEither("Current state is missing");
            });
        });
    }

    public Mono<Either<String, Account>> deposit(String str, BigDecimal bigDecimal) {
        return this.eventProcessor.processCommand(new BankCommand.Deposit(str, bigDecimal)).map(either -> {
            return either.flatMap(processingSuccess -> {
                return processingSuccess.currentState.toEither("Current state is missing");
            });
        });
    }

    public Mono<Either<String, ProcessingSuccess<Account, BankEvent, Tuple0, Tuple0, Tuple0>>> close(String str) {
        return this.eventProcessor.processCommand(new BankCommand.CloseAccount(str));
    }

    public Mono<Option<Account>> findAccountById(String str) {
        return this.eventProcessor.getAggregate(str);
    }

    public Mono<BigDecimal> meanWithdrawByClientAndMonth(String str, Integer num, String str2) {
        return this.withdrawByMonthProjection.meanWithdrawByClientAndMonth(str, num, str2);
    }

    public Mono<BigDecimal> meanWithdrawByClient(String str) {
        return this.withdrawByMonthProjection.meanWithdrawByClient(str);
    }
}
