package net.quasardb.kafka.sink;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import net.quasardb.kafka.common.ConnectorUtils;
import net.quasardb.kafka.common.TableRegistry;
import net.quasardb.kafka.common.resolver.Resolver;
import net.quasardb.kafka.common.writer.RecordWriter;
import net.quasardb.qdb.Session;
import net.quasardb.qdb.ts.Table;
import net.quasardb.qdb.ts.Writer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/quasardb/kafka/sink/QdbSinkTask.class */
public class QdbSinkTask extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger(QdbSinkTask.class);
    private static Session _keepAlive = new Session();
    private Session session;
    private Writer writer;
    private TableRegistry tableRegistry;
    private RecordWriter recordWriter;
    private Resolver<String> tableResolver;
    private Resolver<String> skeletonTableResolver;
    private Resolver<List<String>> tableTagsResolver;

    public String version() {
        return new QdbSinkConnector().version();
    }

    public void start(Map<String, String> map) {
        if (this.writer != null) {
            throw new RuntimeException("can only start a task once");
        }
        this.tableRegistry = new TableRegistry();
        Map parse = new QdbSinkConnector().config().parse(map);
        this.session = ConnectorUtils.connect(parse);
        this.tableResolver = ConnectorUtils.createTableResolver(parse);
        this.skeletonTableResolver = ConnectorUtils.createSkeletonTableResolver(parse);
        this.tableTagsResolver = ConnectorUtils.createTableTagsResolver(parse);
        this.recordWriter = ConnectorUtils.createRecordWriter(parse);
        log.info("Started QdbSinkTask");
    }

    public void stop() {
        log.info("Stopping QdbSinkTask");
        try {
            if (this.writer != null) {
                this.writer.close();
                this.writer = null;
            }
            if (this.session != null) {
                this.session.close();
                this.session = null;
            }
            this.tableRegistry = null;
            this.tableResolver = null;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Table createTable(String str, SinkRecord sinkRecord) throws DataException {
        if (this.skeletonTableResolver == null) {
            throw new DataException("Table '" + str + "' not found, and no skeleton table configuration for creation, aborting");
        }
        Table table = new Table(this.session, this.skeletonTableResolver.resolve(sinkRecord));
        log.info("creating copy of skeleton table '" + table.getName() + "' into target table '" + str + "'");
        Table create = Table.create(this.session, str, table);
        if (this.tableTagsResolver != null) {
            List<String> resolve = this.tableTagsResolver.resolve(sinkRecord);
            log.debug("attaching tags " + resolve.toString() + " to table " + str);
            Table.attachTags(this.session, str, resolve);
        }
        return create;
    }

    private Table addTableToRegistry(String str, SinkRecord sinkRecord) throws DataException {
        if (str == null) {
            throw new DataException("Invalid table name provided: " + str);
        }
        log.info("Adding table to registry: " + str);
        Table put = this.tableRegistry.put(this.session, str);
        if (put == null) {
            put = this.tableRegistry.put(createTable(str, sinkRecord));
        }
        if (this.writer == null) {
            log.debug("Initializing Writer");
            this.writer = Writer.builder(this.session).asyncPush().dropDuplicates().build();
        }
        return put;
    }

    public void put(Collection<SinkRecord> collection) {
        for (SinkRecord sinkRecord : collection) {
            String resolve = this.tableResolver.resolve(sinkRecord);
            Table table = this.tableRegistry.get(resolve);
            if (table == null) {
                table = addTableToRegistry(resolve, sinkRecord);
            }
            this.recordWriter.write(this.writer, table, sinkRecord);
        }
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        try {
            if (this.writer != null) {
                log.info("Flush request received, flushing writer..");
                this.writer.flush();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
