package me.ahoo.wow.r2dbc;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.R2dbcDataIntegrityViolationException;
import io.r2dbc.spi.Readable;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Statement;
import java.util.function.Function;
import kotlin.Metadata;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.text.StringsKt;
import me.ahoo.wow.api.modeling.AggregateId;
import me.ahoo.wow.api.modeling.OwnerId;
import me.ahoo.wow.command.DuplicateRequestIdException;
import me.ahoo.wow.event.DomainEventStream;
import me.ahoo.wow.eventsourcing.AbstractEventStore;
import me.ahoo.wow.eventsourcing.EventVersionConflictException;
import me.ahoo.wow.serialization.JsonSerializer;
import me.ahoo.wow.serialization.JsonSerializerKt;
import me.ahoo.wow.serialization.event.EventStreamRecord;
import me.ahoo.wow.serialization.event.EventStreamRecordKt;
import me.ahoo.wow.serialization.event.FlatEventStreamRecord;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* compiled from: R2dbcEventStore.kt */
@Metadata(mv = {2, 0, 0}, k = 1, xi = 48, d1 = {"��T\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\b\n\u0002\b\u0002\n\u0002\u0010\t\n\u0002\b\u0002\u0018��2\u00020\u0001B\u0017\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005¢\u0006\u0004\b\u0006\u0010\u0007J\u0016\u0010\b\u001a\b\u0012\u0004\u0012\u00020\n0\t2\u0006\u0010\u000b\u001a\u00020\fH\u0016J*\u0010\r\u001a\b\u0012\u0004\u0012\u00020\f0\u000e2\u0006\u0010\u000f\u001a\u00020\u00102\u0012\u0010\u0011\u001a\u000e\u0012\u0004\u0012\u00020\u0013\u0012\u0004\u0012\u00020\u00140\u0012H\u0002J&\u0010\u0015\u001a\b\u0012\u0004\u0012\u00020\f0\u000e2\u0006\u0010\u000f\u001a\u00020\u00102\u0006\u0010\u0016\u001a\u00020\u00172\u0006\u0010\u0018\u001a\u00020\u0017H\u0016J&\u0010\u0015\u001a\b\u0012\u0004\u0012\u00020\f0\u000e2\u0006\u0010\u000f\u001a\u00020\u00102\u0006\u0010\u0019\u001a\u00020\u001a2\u0006\u0010\u001b\u001a\u00020\u001aH\u0014R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u001c"}, d2 = {"Lme/ahoo/wow/r2dbc/R2dbcEventStore;", "Lme/ahoo/wow/eventsourcing/AbstractEventStore;", "database", "Lme/ahoo/wow/r2dbc/Database;", "eventStreamSchema", "Lme/ahoo/wow/r2dbc/EventStreamSchema;", "<init>", "(Lme/ahoo/wow/r2dbc/Database;Lme/ahoo/wow/r2dbc/EventStreamSchema;)V", "appendStream", "Lreactor/core/publisher/Mono;", "Ljava/lang/Void;", "eventStream", "Lme/ahoo/wow/event/DomainEventStream;", "load", "Lreactor/core/publisher/Flux;", "aggregateId", "Lme/ahoo/wow/api/modeling/AggregateId;", "statementSupplier", "Lkotlin/Function1;", "Lio/r2dbc/spi/Connection;", "Lio/r2dbc/spi/Statement;", "loadStream", "headVersion", "", "tailVersion", "headEventTime", "", "tailEventTime", "wow-r2dbc"})
/* loaded from: input_file:me/ahoo/wow/r2dbc/R2dbcEventStore.class */
public final class R2dbcEventStore extends AbstractEventStore {

    @NotNull
    private final Database database;

    @NotNull
    private final EventStreamSchema eventStreamSchema;

    public R2dbcEventStore(@NotNull Database database, @NotNull EventStreamSchema eventStreamSchema) {
        Intrinsics.checkNotNullParameter(database, "database");
        Intrinsics.checkNotNullParameter(eventStreamSchema, "eventStreamSchema");
        this.database = database;
        this.eventStreamSchema = eventStreamSchema;
    }

    @NotNull
    public Mono<Void> appendStream(@NotNull final DomainEventStream domainEventStream) {
        Intrinsics.checkNotNullParameter(domainEventStream, "eventStream");
        Publisher<? extends Connection> createConnection = this.database.createConnection(domainEventStream.getAggregateId());
        Function1 function1 = new Function1() { // from class: me.ahoo.wow.r2dbc.R2dbcEventStore$appendStream$1
            public final Publisher<? extends Result> invoke(Connection connection) {
                EventStreamSchema eventStreamSchema;
                ObjectNode valueToTree = JsonSerializer.INSTANCE.valueToTree(domainEventStream);
                Intrinsics.checkNotNullExpressionValue(valueToTree, "valueToTree(...)");
                EventStreamRecord eventStreamRecord = EventStreamRecordKt.toEventStreamRecord(valueToTree);
                eventStreamSchema = this.eventStreamSchema;
                return connection.createStatement(eventStreamSchema.append(domainEventStream.getAggregateId())).bind(0, eventStreamRecord.getId()).bind(1, domainEventStream.getAggregateId().getId()).bind(2, domainEventStream.getAggregateId().getTenantId()).bind(3, domainEventStream.getOwnerId()).bind(4, eventStreamRecord.getRequestId()).bind(5, eventStreamRecord.getCommandId()).bind(6, Integer.valueOf(eventStreamRecord.getVersion())).bind(7, JsonSerializerKt.toJsonString(eventStreamRecord.getHeader())).bind(8, JsonSerializerKt.toJsonString(eventStreamRecord.getBody())).bind(9, Integer.valueOf(domainEventStream.getSize())).bind(10, Long.valueOf(eventStreamRecord.getCreateTime())).execute();
            }
        };
        Function function = (v1) -> {
            return appendStream$lambda$0(r1, v1);
        };
        R2dbcEventStore$appendStream$2 r2dbcEventStore$appendStream$2 = R2dbcEventStore$appendStream$2.INSTANCE;
        Flux usingWhen = Flux.usingWhen(createConnection, function, (v1) -> {
            return appendStream$lambda$1(r2, v1);
        });
        Function1 function12 = R2dbcEventStore::appendStream$lambda$2;
        Flux flatMap = usingWhen.flatMap((v1) -> {
            return appendStream$lambda$3(r1, v1);
        });
        Function1 function13 = (v2) -> {
            return appendStream$lambda$4(r2, r3, v2);
        };
        Mono<Void> then = flatMap.onErrorMap(R2dbcDataIntegrityViolationException.class, (v1) -> {
            return appendStream$lambda$5(r2, v1);
        }).then();
        Intrinsics.checkNotNullExpressionValue(then, "then(...)");
        return then;
    }

    private final Flux<DomainEventStream> load(AggregateId aggregateId, final Function1<? super Connection, ? extends Statement> function1) {
        Publisher<? extends Connection> createConnection = this.database.createConnection(aggregateId);
        Function1 function12 = new Function1() { // from class: me.ahoo.wow.r2dbc.R2dbcEventStore$load$1
            public final Publisher<? extends Result> invoke(Connection connection) {
                Function1<Connection, Statement> function13 = function1;
                Intrinsics.checkNotNull(connection);
                return ((Statement) function13.invoke(connection)).execute();
            }
        };
        Function function = (v1) -> {
            return load$lambda$6(r1, v1);
        };
        R2dbcEventStore$load$2 r2dbcEventStore$load$2 = R2dbcEventStore$load$2.INSTANCE;
        Flux usingWhen = Flux.usingWhen(createConnection, function, (v1) -> {
            return load$lambda$7(r2, v1);
        });
        Function1 function13 = (v1) -> {
            return load$lambda$11(r1, v1);
        };
        Flux<DomainEventStream> flatMap = usingWhen.flatMap((v1) -> {
            return load$lambda$12(r1, v1);
        });
        Intrinsics.checkNotNullExpressionValue(flatMap, "flatMap(...)");
        return flatMap;
    }

    @NotNull
    public Flux<DomainEventStream> loadStream(@NotNull AggregateId aggregateId, int i, int i2) {
        Intrinsics.checkNotNullParameter(aggregateId, "aggregateId");
        return load(aggregateId, (v4) -> {
            return loadStream$lambda$13(r2, r3, r4, r5, v4);
        });
    }

    @NotNull
    protected Flux<DomainEventStream> loadStream(@NotNull AggregateId aggregateId, long j, long j2) {
        Intrinsics.checkNotNullParameter(aggregateId, "aggregateId");
        return load(aggregateId, (v4) -> {
            return loadStream$lambda$14(r2, r3, r4, r5, v4);
        });
    }

    private static final Publisher appendStream$lambda$0(Function1 function1, Object obj) {
        return (Publisher) function1.invoke(obj);
    }

    private static final Publisher appendStream$lambda$1(Function1 function1, Object obj) {
        return (Publisher) function1.invoke(obj);
    }

    private static final Publisher appendStream$lambda$2(Result result) {
        return result.getRowsUpdated();
    }

    private static final Publisher appendStream$lambda$3(Function1 function1, Object obj) {
        return (Publisher) function1.invoke(obj);
    }

    private static final Throwable appendStream$lambda$4(R2dbcEventStore r2dbcEventStore, DomainEventStream domainEventStream, R2dbcDataIntegrityViolationException r2dbcDataIntegrityViolationException) {
        String message = r2dbcDataIntegrityViolationException.getMessage();
        Intrinsics.checkNotNull(message);
        if (StringsKt.contains$default(message, r2dbcEventStore.eventStreamSchema.getAggregateIdVersionUniqueIndexName(), false, 2, (Object) null)) {
            return new EventVersionConflictException(domainEventStream, (String) null, (Throwable) r2dbcDataIntegrityViolationException, 2, (DefaultConstructorMarker) null);
        }
        String message2 = r2dbcDataIntegrityViolationException.getMessage();
        Intrinsics.checkNotNull(message2);
        return StringsKt.contains$default(message2, r2dbcEventStore.eventStreamSchema.getRequestIdUniqueIndexName(), false, 2, (Object) null) ? new DuplicateRequestIdException(domainEventStream.getAggregateId(), domainEventStream.getRequestId(), (String) null, (Throwable) r2dbcDataIntegrityViolationException, 4, (DefaultConstructorMarker) null) : (Throwable) r2dbcDataIntegrityViolationException;
    }

    private static final Throwable appendStream$lambda$5(Function1 function1, Object obj) {
        return (Throwable) function1.invoke(obj);
    }

    private static final Publisher load$lambda$6(Function1 function1, Object obj) {
        return (Publisher) function1.invoke(obj);
    }

    private static final Publisher load$lambda$7(Function1 function1, Object obj) {
        return (Publisher) function1.invoke(obj);
    }

    private static final DomainEventStream load$lambda$11$lambda$9(AggregateId aggregateId, Readable readable) {
        if (!Intrinsics.areEqual(aggregateId.getId(), (String) readable.get("aggregate_id", String.class))) {
            throw new IllegalArgumentException("Failed requirement.".toString());
        }
        Object obj = readable.get("id", String.class);
        if (obj == null) {
            throw new IllegalStateException("Required value was null.".toString());
        }
        String str = (String) obj;
        Object obj2 = readable.get("request_id", String.class);
        if (obj2 == null) {
            throw new IllegalStateException("Required value was null.".toString());
        }
        String str2 = (String) obj2;
        Object obj3 = readable.get("tenant_id", String.class);
        if (obj3 == null) {
            throw new IllegalStateException("Required value was null.".toString());
        }
        String str3 = (String) obj3;
        if (!Intrinsics.areEqual(str3, aggregateId.getTenantId())) {
            throw new IllegalArgumentException(("The aggregated tenantId[" + aggregateId.getTenantId() + "] does not match the tenantId:[" + str3 + "] stored in the eventStore").toString());
        }
        String orDefaultOwnerId = OwnerId.Companion.orDefaultOwnerId((String) readable.get("owner_id", String.class));
        Object obj4 = readable.get("command_id", String.class);
        if (obj4 == null) {
            throw new IllegalStateException("Required value was null.".toString());
        }
        String str4 = (String) obj4;
        Object obj5 = readable.get("version", Integer.TYPE);
        if (obj5 == null) {
            throw new IllegalStateException("Required value was null.".toString());
        }
        int intValue = ((Number) obj5).intValue();
        Object obj6 = readable.get("header", String.class);
        if (obj6 == null) {
            throw new IllegalStateException("Required value was null.".toString());
        }
        String str5 = (String) obj6;
        Object obj7 = readable.get("body", String.class);
        if (obj7 == null) {
            throw new IllegalStateException("Required value was null.".toString());
        }
        String str6 = (String) obj7;
        Object obj8 = readable.get("create_time", Long.TYPE);
        if (obj8 == null) {
            throw new IllegalStateException("Required value was null.".toString());
        }
        long longValue = ((Number) obj8).longValue();
        ObjectNode jsonNode = JsonSerializerKt.toJsonNode(str5);
        Intrinsics.checkNotNull(jsonNode, "null cannot be cast to non-null type com.fasterxml.jackson.databind.node.ObjectNode");
        return new FlatEventStreamRecord(str, aggregateId, jsonNode, intValue, orDefaultOwnerId, str4, str2, JsonSerializerKt.toJsonNode(str6), longValue).toDomainEventStream();
    }

    private static final DomainEventStream load$lambda$11$lambda$10(Function1 function1, Object obj) {
        return (DomainEventStream) function1.invoke(obj);
    }

    private static final Publisher load$lambda$11(AggregateId aggregateId, Result result) {
        Function1 function1 = (v1) -> {
            return load$lambda$11$lambda$9(r1, v1);
        };
        return result.map((v1) -> {
            return load$lambda$11$lambda$10(r1, v1);
        });
    }

    private static final Publisher load$lambda$12(Function1 function1, Object obj) {
        return (Publisher) function1.invoke(obj);
    }

    private static final Statement loadStream$lambda$13(R2dbcEventStore r2dbcEventStore, AggregateId aggregateId, int i, int i2, Connection connection) {
        Intrinsics.checkNotNullParameter(connection, "it");
        Statement bind = connection.createStatement(r2dbcEventStore.eventStreamSchema.load(aggregateId)).bind(0, aggregateId.getId()).bind(1, Integer.valueOf(i)).bind(2, Integer.valueOf(i2));
        Intrinsics.checkNotNullExpressionValue(bind, "bind(...)");
        return bind;
    }

    private static final Statement loadStream$lambda$14(R2dbcEventStore r2dbcEventStore, AggregateId aggregateId, long j, long j2, Connection connection) {
        Intrinsics.checkNotNullParameter(connection, "it");
        Statement bind = connection.createStatement(r2dbcEventStore.eventStreamSchema.loadByEventTime(aggregateId)).bind(0, aggregateId.getId()).bind(1, Long.valueOf(j)).bind(2, Long.valueOf(j2));
        Intrinsics.checkNotNullExpressionValue(bind, "bind(...)");
        return bind;
    }
}
