package io.kareldb.kafka;

import io.kareldb.avro.AvroSchema;
import io.kareldb.avro.AvroUtils;
import io.kareldb.kafka.serialization.KafkaSchemaKeySerde;
import io.kareldb.kafka.serialization.KafkaSchemaValueSerde;
import io.kareldb.schema.ColumnDef;
import io.kareldb.schema.ColumnStrategy;
import io.kareldb.schema.RelDef;
import io.kareldb.schema.Schema;
import io.kcache.Cache;
import io.kcache.CacheUpdateHandler;
import io.kcache.KafkaCache;
import io.kcache.KafkaCacheConfig;
import io.kcache.KeyValueIterator;
import io.kcache.utils.Caches;
import io.kcache.utils.InMemoryCache;
import io.kcache.utils.Streams;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.avro.SchemaValidationException;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.ddl.SqlAlterTableExtension;
import org.apache.calcite.util.Pair;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kareldb/kafka/KafkaSchema.class */
public class KafkaSchema extends Schema {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSchema.class);
    public static final int MIN_VERSION = 1;
    public static final int MAX_VERSION = Integer.MAX_VALUE;
    private final Map<String, Table> tableMap = new ConcurrentHashMap();
    private Cache<KafkaSchemaKey, KafkaSchemaValue> schemaMap;

    /* loaded from: input_file:io/kareldb/kafka/KafkaSchema$Action.class */
    public enum Action {
        CREATE,
        ALTER,
        DROP
    }

    /* loaded from: input_file:io/kareldb/kafka/KafkaSchema$TableUpdateHandler.class */
    private class TableUpdateHandler implements CacheUpdateHandler<KafkaSchemaKey, KafkaSchemaValue> {
        private TableUpdateHandler() {
        }

        public void handleUpdate(KafkaSchemaKey kafkaSchemaKey, KafkaSchemaValue kafkaSchemaValue, KafkaSchemaValue kafkaSchemaValue2, TopicPartition topicPartition, long j, long j2) {
            String tableName = kafkaSchemaKey.getTableName();
            switch (kafkaSchemaValue.getAction()) {
                case CREATE:
                    org.apache.avro.Schema parseSchema = AvroUtils.parseSchema(kafkaSchemaValue.getSchema());
                    RelDef rowType = AvroSchema.toRowType(parseSchema);
                    HashMap hashMap = new HashMap(KafkaSchema.this.getConfigs());
                    hashMap.put("avroSchema", parseSchema);
                    hashMap.put("epoch", kafkaSchemaValue.getEpoch());
                    KafkaTable kafkaTable = new KafkaTable(KafkaSchema.this, parseSchema.getName(), rowType);
                    kafkaTable.configure(hashMap);
                    KafkaSchema.this.tableMap.put(tableName, kafkaTable);
                    return;
                case ALTER:
                    ((io.kareldb.schema.Table) KafkaSchema.this.tableMap.get(tableName)).setRelDef(AvroSchema.toRowType(AvroUtils.parseSchema(kafkaSchemaValue.getSchema())));
                    return;
                case DROP:
                    io.kareldb.schema.Table table = (io.kareldb.schema.Table) KafkaSchema.this.tableMap.get(tableName);
                    if (table != null) {
                        try {
                            table.close();
                            KafkaSchema.this.tableMap.remove(tableName);
                            return;
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    }
                    return;
                default:
                    return;
            }
        }
    }

    @Override // io.kareldb.schema.Schema
    public Map<String, Table> getTableMap() {
        return this.tableMap;
    }

    @Override // io.kareldb.schema.Schema
    public void configure(Map<String, ?> map) {
        super.configure(map);
        HashMap hashMap = new HashMap(map);
        String str = (String) hashMap.getOrDefault("kafkacache.group.id", "kareldb-1");
        hashMap.put("kafkacache.topic", "_tables");
        hashMap.put("kafkacache.group.id", str);
        hashMap.put("kafkacache.client.id", str + "-_tables");
        this.schemaMap = Caches.concurrentCache(new KafkaCache(new KafkaCacheConfig(hashMap), new KafkaSchemaKeySerde(), new KafkaSchemaValueSerde(), new TableUpdateHandler(), new InMemoryCache()));
    }

    @Override // io.kareldb.schema.Schema
    public void init() {
        this.schemaMap.init();
        CompletableFuture.allOf((CompletableFuture[]) this.tableMap.values().stream().map(table -> {
            io.kareldb.schema.Table table = (io.kareldb.schema.Table) table;
            Objects.requireNonNull(table);
            return CompletableFuture.runAsync(table::init);
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).join();
    }

    @Override // io.kareldb.schema.Schema
    public void sync() {
        this.schemaMap.sync();
        CompletableFuture.allOf((CompletableFuture[]) this.tableMap.values().stream().map(table -> {
            io.kareldb.schema.Table table = (io.kareldb.schema.Table) table;
            Objects.requireNonNull(table);
            return CompletableFuture.runAsync(table::sync);
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).join();
    }

    public KafkaSchemaValue getSchemaValue(String str, int i) {
        return (KafkaSchemaValue) this.schemaMap.get(new KafkaSchemaKey(str, i));
    }

    public List<KafkaSchemaValue> getLatestSchemaValuesDescending(String str) {
        KafkaSchemaKey kafkaSchemaKey = new KafkaSchemaKey(str, 1);
        KeyValueIterator range = this.schemaMap.descendingCache().range(new KafkaSchemaKey(str, MAX_VERSION), false, kafkaSchemaKey, true);
        try {
            List<KafkaSchemaValue> list = (List) Streams.streamOf(range).map(keyValue -> {
                return (KafkaSchemaValue) keyValue.value;
            }).collect(Collectors.toList());
            if (list.isEmpty()) {
                if (range != null) {
                    range.close();
                }
                return list;
            }
            int intValue = list.get(0).getEpoch().intValue();
            List<KafkaSchemaValue> list2 = (List) list.stream().filter(kafkaSchemaValue -> {
                return kafkaSchemaValue.getEpoch().intValue() == intValue;
            }).collect(Collectors.toList());
            if (range != null) {
                range.close();
            }
            return list2;
        } catch (Throwable th) {
            if (range != null) {
                try {
                    range.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public KafkaSchemaValue getLatestSchemaValue(String str) {
        KeyValueIterator range = this.schemaMap.range(new KafkaSchemaKey(str, 1), true, new KafkaSchemaKey(str, MAX_VERSION), false);
        try {
            KafkaSchemaValue kafkaSchemaValue = (KafkaSchemaValue) Streams.streamOf(range).reduce((keyValue, keyValue2) -> {
                return keyValue2;
            }).map(keyValue3 -> {
                return (KafkaSchemaValue) keyValue3.value;
            }).orElse(null);
            if (range != null) {
                range.close();
            }
            return kafkaSchemaValue;
        } catch (Throwable th) {
            if (range != null) {
                try {
                    range.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // io.kareldb.schema.Schema
    public io.kareldb.schema.Table createTable(String str, Map<String, Object> map, RelDef relDef) {
        KafkaSchemaValue latestSchemaValue = getLatestSchemaValue(str);
        if (latestSchemaValue != null && latestSchemaValue.getAction() != Action.DROP) {
            throw new IllegalStateException("Table " + str + " already exists");
        }
        int intValue = latestSchemaValue != null ? latestSchemaValue.getVersion().intValue() + 1 : 1;
        this.schemaMap.put(new KafkaSchemaKey(str, intValue), new KafkaSchemaValue(str, Integer.valueOf(intValue), AvroSchema.toAvroSchema(str, relDef).toString(), Action.CREATE, Integer.valueOf(intValue)));
        this.schemaMap.flush();
        io.kareldb.schema.Table table = this.tableMap.get(str);
        table.init();
        return table;
    }

    @Override // io.kareldb.schema.Schema
    public void alterTable(String str, List<SqlAlterTableExtension.Action> list, RelDef relDef) {
        if (relDef.getKeyFields().size() > 0) {
            throw new IllegalArgumentException("Key fields cannot be altered");
        }
        KafkaSchemaValue latestSchemaValue = getLatestSchemaValue(str);
        if (latestSchemaValue == null || latestSchemaValue.getAction() == Action.DROP) {
            throw new IllegalStateException("Table " + str + " does not exist");
        }
        int intValue = latestSchemaValue.getVersion().intValue();
        Pair<LinkedHashMap<String, ColumnDef>, List<String>> columnDefs = AvroSchema.toColumnDefs(AvroUtils.parseSchema(latestSchemaValue.getSchema()));
        LinkedHashMap linkedHashMap = (LinkedHashMap) columnDefs.left;
        List list2 = (List) columnDefs.right;
        for (Ord ord : Ord.zip(Schema.toColumnDefs(relDef.getRowType()).entrySet())) {
            int i = ord.i;
            Map.Entry entry = (Map.Entry) ord.e;
            ColumnDef columnDef = (ColumnDef) entry.getValue();
            SqlAlterTableExtension.Action action = list.get(i);
            ColumnStrategy columnStrategy = relDef.getStrategies().get(i);
            if (action == SqlAlterTableExtension.Action.DROP) {
                linkedHashMap.remove(entry.getKey());
            } else {
                linkedHashMap.put((String) entry.getKey(), new ColumnDef(columnDef.getColumnType(), columnStrategy, columnDef.getPrecision(), columnDef.getScale()));
            }
        }
        org.apache.avro.Schema avroSchema = AvroSchema.toAvroSchema(str, Schema.toRowType(linkedHashMap, list2));
        try {
            AvroUtils.checkCompatibility(avroSchema, (List) getLatestSchemaValuesDescending(str).stream().map(kafkaSchemaValue -> {
                return AvroUtils.parseSchema(kafkaSchemaValue.getSchema());
            }).collect(Collectors.toList()));
            this.schemaMap.put(new KafkaSchemaKey(str, intValue + 1), new KafkaSchemaValue(str, Integer.valueOf(intValue + 1), avroSchema.toString(), Action.ALTER, latestSchemaValue.getEpoch()));
            this.schemaMap.flush();
        } catch (SchemaValidationException e) {
            LOG.error(e.getMessage());
            throw new IllegalArgumentException((Throwable) e);
        }
    }

    @Override // io.kareldb.schema.Schema
    public boolean dropTable(String str) {
        KafkaSchemaValue latestSchemaValue = getLatestSchemaValue(str);
        if (latestSchemaValue == null || latestSchemaValue.getAction() == Action.DROP) {
            return false;
        }
        boolean z = this.tableMap.get(str) != null;
        int intValue = latestSchemaValue.getVersion().intValue();
        this.schemaMap.put(new KafkaSchemaKey(str, intValue + 1), new KafkaSchemaValue(str, Integer.valueOf(intValue + 1), null, Action.DROP, latestSchemaValue.getEpoch()));
        this.schemaMap.flush();
        return z;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Iterator<Table> it = this.tableMap.values().iterator();
        while (it.hasNext()) {
            ((Table) it.next()).close();
        }
        this.schemaMap.close();
    }
}
