package tech.kage.event.postgres;

import java.net.URI;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import tech.kage.event.Event;
import tech.kage.event.EventStore;
import tech.kage.event.crypto.EventEncryptor;
import tech.kage.event.crypto.MetadataSerializer;

@Component
/* loaded from: input_file:tech/kage/event/postgres/PostgresEventStore.class */
public class PostgresEventStore<K, V extends SpecificRecord> implements EventStore<K, V> {
    private static final String INSERT_EVENT_SQL = "    INSERT INTO events.%s (key, data, timestamp)\n    VALUES (:key, :data, :timestamp)\n";
    private static final String INSERT_EVENT_WITH_METADATA_SQL = "    INSERT INTO events.%s (key, data, metadata, timestamp)\n    VALUES (:key, :data, :metadata, :timestamp)\n";
    private final DatabaseClient databaseClient;
    private final Serializer<SpecificRecord> kafkaAvroSerializer;
    private final EventEncryptor eventEncryptor;

    @Configuration
    @Import({EventEncryptor.class})
    /* loaded from: input_file:tech/kage/event/postgres/PostgresEventStore$Config.class */
    static class Config {
        private static final String SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url";
        private static final String SCHEMA_REGISTRY_URL_NOT_SET_ERROR = "schema.registry.url must be set";
        private static final String VALUE_SUBJECT_NAME_STRATEGY_CONFIG = "value.subject.name.strategy";
        private static final String VALUE_SUBJECT_NAME_STRATEGY = "io.confluent.kafka.serializers.subject.RecordNameStrategy";
        private static final String KAFKA_AVRO_SERIALIZER_CLASS = "io.confluent.kafka.serializers.KafkaAvroSerializer";

        Config() {
        }

        @Bean
        Serializer<SpecificRecord> kafkaAvroSerializer(@Value("${schema.registry.url:#{null}}") String str, Optional<KafkaProperties> optional) {
            Map properties = optional.isPresent() ? optional.get().getProperties() : Map.of(SCHEMA_REGISTRY_URL_CONFIG, (String) Objects.requireNonNull(str, SCHEMA_REGISTRY_URL_NOT_SET_ERROR), VALUE_SUBJECT_NAME_STRATEGY_CONFIG, VALUE_SUBJECT_NAME_STRATEGY);
            Serializer<SpecificRecord> serializerInstance = getSerializerInstance();
            serializerInstance.configure(properties, false);
            return serializerInstance;
        }

        private Serializer<SpecificRecord> getSerializerInstance() {
            try {
                return (Serializer) Class.forName(KAFKA_AVRO_SERIALIZER_CLASS).getConstructor(new Class[0]).newInstance(new Object[0]);
            } catch (Exception e) {
                throw new IllegalArgumentException("Unable to instantiate serializer io.confluent.kafka.serializers.KafkaAvroSerializer", e);
            }
        }
    }

    PostgresEventStore(DatabaseClient databaseClient, Serializer<SpecificRecord> serializer, EventEncryptor eventEncryptor) {
        this.databaseClient = databaseClient;
        this.kafkaAvroSerializer = serializer;
        this.eventEncryptor = eventEncryptor;
    }

    public Mono<Event<K, V>> save(String str, Event<K, V> event) {
        return doSave(str, event, null);
    }

    public Mono<Event<K, V>> save(String str, Event<K, V> event, URI uri) {
        Objects.requireNonNull(uri, "encryptionKey must not be null");
        return doSave(str, event, uri);
    }

    private Mono<Event<K, V>> doSave(String str, Event<K, V> event, URI uri) {
        Objects.requireNonNull(str, "topic must not be null");
        Objects.requireNonNull(event, "event must not be null");
        if (event.metadata().containsKey("id")) {
            throw new IllegalArgumentException("id must not be set in metadata");
        }
        if (event.metadata().containsKey("kid")) {
            throw new IllegalArgumentException("kid must not be set in metadata");
        }
        return Mono.fromCallable(() -> {
            return this.kafkaAvroSerializer.serialize(str, event.payload());
        }).subscribeOn(Schedulers.boundedElastic()).flatMap(bArr -> {
            return uri != null ? Mono.fromCallable(() -> {
                return this.eventEncryptor.encrypt(bArr, event.key(), event.timestamp(), event.metadata(), uri);
            }) : Mono.just(bArr);
        }).flatMap(bArr2 -> {
            return this.databaseClient.sql((event.metadata().isEmpty() && uri == null) ? INSERT_EVENT_SQL.formatted(str) : INSERT_EVENT_WITH_METADATA_SQL.formatted(str)).bind("key", event.key()).bind("data", bArr2).bind("timestamp", event.timestamp().atOffset(ZoneOffset.UTC)).bindValues((event.metadata().isEmpty() && uri == null) ? Map.of() : Map.of("metadata", prepareMetadataColumn(event.metadata(), uri))).fetch().rowsUpdated();
        }).map(l -> {
            return event;
        });
    }

    private byte[] prepareMetadataColumn(Map<String, Object> map, URI uri) {
        HashMap hashMap = new HashMap(map);
        if (uri != null) {
            hashMap.put("kid", uri.toString().getBytes());
        }
        return MetadataSerializer.serialize(hashMap);
    }
}
