/*
 * Decompiled with CFR 0.152.
 */
package io.continual.services.processor.library.jdbcio.sources;

import io.continual.builder.Builder;
import io.continual.services.processor.config.readers.ConfigLoadContext;
import io.continual.services.processor.engine.model.Message;
import io.continual.services.processor.engine.model.MessageAndRouting;
import io.continual.services.processor.engine.model.Source;
import io.continual.services.processor.engine.model.StreamProcessingContext;
import io.continual.services.processor.library.jdbcio.DbConnection;
import io.continual.services.processor.library.jdbcio.common.DbConnector;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.json.JSONObject;

public class JdbcSource
extends DbConnector
implements Source {
    private final String fQuery;
    private final String fPipeline;
    private ArrayList<JSONObject> fRows = null;

    public JdbcSource(JSONObject config) throws Builder.BuildFailure {
        this(null, config);
    }

    public JdbcSource(ConfigLoadContext sc, JSONObject config) throws Builder.BuildFailure {
        this(JdbcSource.dbConnectionFromConfig(sc, config), sc, config);
    }

    public JdbcSource(DbConnection dbsrc, ConfigLoadContext sc, JSONObject config) throws Builder.BuildFailure {
        super(dbsrc, sc, config);
        this.fQuery = config.getString("query");
        this.fPipeline = config.getString("pipeline");
    }

    public boolean isEof() throws IOException {
        return this.fRows != null && this.fRows.size() == 0;
    }

    public MessageAndRouting getNextMessage(StreamProcessingContext spc, long waitAtMost, TimeUnit waitAtMostTimeUnits) throws IOException, InterruptedException {
        try {
            this.init();
            if (this.fRows != null && this.fRows.size() > 0) {
                JSONObject data = this.fRows.remove(0);
                return new MessageAndRouting(Message.adoptJsonAsMessage((JSONObject)data), this.fPipeline);
            }
        }
        catch (SQLException x) {
            spc.warn("Couldn't fetch JDBC records. " + x.getMessage());
        }
        return null;
    }

    public void markComplete(StreamProcessingContext spc, MessageAndRouting mr) {
    }

    public void requeue(MessageAndRouting msgAndRoute) {
    }

    public void init() throws SQLException {
        if (this.fRows != null) {
            return;
        }
        this.fRows = new ArrayList();
        try (Connection cc = this.getDb().getConnection();){
            PreparedStatement ps = cc.prepareStatement(this.fQuery);
            ResultSet rs = ps.executeQuery();
            ResultSetMetaData md = rs.getMetaData();
            while (rs.next()) {
                JSONObject msg = new JSONObject();
                for (int i = 1; i <= md.getColumnCount(); ++i) {
                    Object val;
                    String colName = md.getColumnName(i);
                    switch (md.getColumnType(i)) {
                        case -6: 
                        case -5: 
                        case 4: 
                        case 5: {
                            val = rs.getInt(i);
                            break;
                        }
                        case 6: 
                        case 7: 
                        case 8: {
                            val = rs.getDouble(i);
                            break;
                        }
                        default: {
                            val = rs.getString(i);
                        }
                    }
                    msg.put(colName, val);
                }
                this.fRows.add(msg);
            }
        }
    }

    public void close() {
    }
}

