package tech.kage.event.postgres;

import java.time.ZoneOffset;
import java.util.Map;
import java.util.Objects;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import tech.kage.event.Event;
import tech.kage.event.EventStore;

@Component
/* loaded from: input_file:tech/kage/event/postgres/PostgresEventStore.class */
public class PostgresEventStore implements EventStore {
    private static final String INSERT_EVENT_SQL = "    INSERT INTO events.%s (key, data, timestamp)\n    VALUES (:key, :data, :timestamp)\n";
    private static final String SERIALIZER_CLASS = "io.confluent.kafka.serializers.KafkaAvroSerializer";
    private final DatabaseClient databaseClient;
    private final Serializer<Object> kafkaAvroSerializer;

    PostgresEventStore(DatabaseClient databaseClient, @Value("${schema.registry.url}") String str) {
        this.databaseClient = databaseClient;
        Map of = Map.of("value.subject.name.strategy", "io.confluent.kafka.serializers.subject.RecordNameStrategy", "schema.registry.url", str);
        this.kafkaAvroSerializer = getSerializerInstance();
        this.kafkaAvroSerializer.configure(of, false);
    }

    public <T extends SpecificRecord> Mono<Event<T>> save(String str, Event<T> event) {
        Objects.requireNonNull(str, "topic must not be null");
        Objects.requireNonNull(event, "event must not be null");
        return Mono.fromCallable(() -> {
            return this.kafkaAvroSerializer.serialize((String) null, event.payload());
        }).subscribeOn(Schedulers.boundedElastic()).flatMap(bArr -> {
            return this.databaseClient.sql(INSERT_EVENT_SQL.formatted(str)).bind("key", event.key()).bind("data", bArr).bind("timestamp", event.timestamp().atOffset(ZoneOffset.UTC)).fetch().rowsUpdated();
        }).map(l -> {
            return event;
        });
    }

    private Serializer<Object> getSerializerInstance() {
        try {
            return (Serializer) Class.forName(SERIALIZER_CLASS).getConstructor(new Class[0]).newInstance(new Object[0]);
        } catch (Exception e) {
            throw new IllegalArgumentException("Unable to instantiate serializer io.confluent.kafka.serializers.KafkaAvroSerializer", e);
        }
    }
}
