/*
 * Decompiled with CFR 0.152.
 */
package io.floodplain.sink.sheet;

import io.floodplain.sink.sheet.SheetSink;
import io.floodplain.sink.sheet.UpdateTuple;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SheetSinkTask
extends SinkTask {
    public static final String SPREADSHEETID = "spreadsheetId";
    public static final String COLUMNS = "columns";
    public static final String TOPIC = "topic";
    public static final String STARTROW = "startRow";
    public static final String STARTCOLUMN = "startColumn";
    private static final Logger logger = LoggerFactory.getLogger(SheetSinkTask.class);
    public String[] columns;
    private SheetSink sheetSink;
    private String spreadsheetId;
    private int startRow;
    private String startColumn;
    private final AtomicLong totalInTransaction = new AtomicLong(0L);

    public String version() {
        return "0.1";
    }

    public void start(Map<String, String> props) {
        logger.info("Starting Sheet connector: {}", props);
        this.spreadsheetId = props.get(SPREADSHEETID);
        this.columns = props.get(COLUMNS).split(",");
        this.startRow = Optional.of(props.get(STARTROW)).map(Integer::parseInt).orElse(1);
        this.startColumn = Optional.of(props.get(STARTCOLUMN)).orElse("A");
        try {
            this.sheetSink = new SheetSink();
        }
        catch (IOException | GeneralSecurityException e) {
            throw new RuntimeException("Problem starting sheet sink connector task", e);
        }
    }

    public SheetSink getSheetSink() {
        return this.sheetSink;
    }

    public void put(Collection<SinkRecord> records) {
        List<UpdateTuple> tuples = this.extractTuples(records);
        long now = System.currentTimeMillis();
        try {
            this.sheetSink.updateRangeWithBatch(this.spreadsheetId, tuples);
        }
        catch (IOException e1) {
            logger.error("Error: ", (Throwable)e1);
        }
        long elapsed = System.currentTimeMillis() - now;
        logger.info("Update took: {} total: {}", (Object)elapsed, (Object)this.totalInTransaction.addAndGet(elapsed));
    }

    private List<UpdateTuple> extractTuples(Collection<SinkRecord> records) {
        LinkedHashMap<Integer, UpdateTuple> result = new LinkedHashMap<Integer, UpdateTuple>();
        logger.info("Inserting {} records", (Object)records.size());
        for (SinkRecord sinkRecord : records) {
            Map toplevel = (Map)sinkRecord.value();
            if (toplevel == null) {
                logger.info("Ignoring delete of key: {}", sinkRecord.key());
                continue;
            }
            Integer row = (Integer)toplevel.get("_row");
            if (row == null) {
                throw new IllegalArgumentException("Invalid message for Google Sheets: Every message should have an int or long field named: '_row', marking the row where it should be inserted ");
            }
            List<List<Object>> res = this.sheetSink.extractRow(toplevel, this.columns);
            int currentRow = row + this.startRow;
            UpdateTuple ut = new UpdateTuple(this.startColumn + currentRow, res);
            result.put(currentRow, ut);
        }
        return new ArrayList<UpdateTuple>(result.values());
    }

    public void stop() {
    }
}

