/*
 * Decompiled with CFR 0.152.
 */
package io.continual.services.processor.library.jdbcio.sinks;

import io.continual.builder.Builder;
import io.continual.metrics.MetricsCatalog;
import io.continual.metrics.metricTypes.Timer;
import io.continual.services.processor.config.readers.ConfigLoadContext;
import io.continual.services.processor.engine.model.MessageProcessingContext;
import io.continual.services.processor.engine.model.Sink;
import io.continual.services.processor.library.jdbcio.DbConnection;
import io.continual.services.processor.library.jdbcio.common.DbConnector;
import io.continual.util.data.TypeConvertor;
import io.continual.util.data.json.JsonVisitor;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcSink
extends DbConnector
implements Sink {
    private static final String kSetting_BufferSize = "postBuffer";
    private static final int kDefault_BufferSize = 64;
    private String fInsertStmt;
    private final ArrayList<ColInfo> fCols;
    private long fPendingCount;
    private long fRecordCount;
    private final int fBufferSize;
    private Connection fCurrentConnection;
    private PreparedStatement fPending;
    private static final Logger log = LoggerFactory.getLogger(JdbcSink.class);

    public JdbcSink() throws Builder.BuildFailure {
        this(new JSONObject());
    }

    public JdbcSink(JSONObject config) throws Builder.BuildFailure {
        this(null, config);
    }

    public JdbcSink(ConfigLoadContext sc, JSONObject config) throws Builder.BuildFailure {
        this(JdbcSink.dbConnectionFromConfig(sc, config), sc, config);
    }

    public JdbcSink(DbConnection dbsrc, ConfigLoadContext sc, JSONObject config) throws Builder.BuildFailure {
        super(dbsrc, sc, config);
        try {
            this.fCols = new ArrayList();
            JsonVisitor.forEachObjectIn((JSONArray)config.optJSONArray("columns"), (JsonVisitor.ArrayOfObjectVisitor)new JsonVisitor.ArrayOfObjectVisitor(){

                public boolean visit(JSONObject col) throws JSONException {
                    JdbcSink.this.fCols.add(new ColInfo(col));
                    return true;
                }
            });
            this.fRecordCount = 0L;
            this.fPendingCount = 0L;
            this.fBufferSize = config.optInt(kSetting_BufferSize, 64);
        }
        catch (JSONException e) {
            throw new Builder.BuildFailure((Throwable)e);
        }
    }

    public JdbcSink addColumn(String key, String value) {
        return this.addColumn(key, value, String.class);
    }

    public JdbcSink addColumn(String key, String value, Class<?> clazz) {
        return this.addColumn(key, value, clazz, null);
    }

    public JdbcSink addColumn(String key, String value, Class<?> clazz, String fmt) {
        this.fCols.add(new ColInfo(key, value, clazz, fmt, 1.0));
        return this;
    }

    public synchronized void init() {
        StringBuilder sb = new StringBuilder().append("REPLACE INTO ").append(this.getTable()).append(" (");
        StringBuilder valPart = new StringBuilder();
        boolean firstCol = true;
        for (ColInfo col : this.fCols) {
            if (!firstCol) {
                sb.append(",");
                valPart.append(",");
            }
            firstCol = false;
            sb.append(col.getKey());
            valPart.append("?");
        }
        sb.append(") VALUES (").append(valPart.toString()).append(")");
        this.fInsertStmt = sb.toString();
    }

    public synchronized void close() {
        this.flush();
        log.warn("JdbcSink closing; sent " + this.fRecordCount + " records.");
    }

    public synchronized void flush() {
        try {
            this.sendToDb();
        }
        catch (SQLException e) {
            log.warn(e.getMessage());
        }
    }

    public synchronized long getRecordsSent() {
        return this.fRecordCount;
    }

    public synchronized void process(MessageProcessingContext context) {
        block45: {
            MetricsCatalog mc = context.getStreamProcessingContext().getMetrics();
            try (MetricsCatalog.PathPopper pp = mc.push("JdbcSink");){
                if (this.fCurrentConnection == null) {
                    this.fCurrentConnection = this.getDb().getConnection();
                }
                if (this.fPending == null) {
                    this.fPending = this.fCurrentConnection.prepareStatement(this.fInsertStmt);
                }
                try (Timer.Context tc = mc.timer("insertPrep").time();){
                    int param = 1;
                    for (ColInfo ci : this.fCols) {
                        String val = context.evalExpression(ci.getExpr());
                        this.buildColumnValue(ci, val, param++);
                    }
                }
                this.fPending.addBatch();
                ++this.fPendingCount;
                log.debug("JdbcSink: {}/{} pending buffered", (Object)this.fPendingCount, (Object)this.fBufferSize);
                if (this.fPendingCount % (long)this.fBufferSize != 0L) break block45;
                tc = mc.timer("sendToDb").time();
                var6_10 = null;
                try {
                    this.sendToDb();
                }
                catch (Throwable throwable) {
                    var6_10 = throwable;
                    throw throwable;
                }
                finally {
                    if (tc != null) {
                        if (var6_10 != null) {
                            try {
                                tc.close();
                            }
                            catch (Throwable throwable) {
                                var6_10.addSuppressed(throwable);
                            }
                        } else {
                            tc.close();
                        }
                    }
                }
            }
            catch (SQLException e) {
                log.warn("While executing a transaction, a SQL Exception: " + e.getMessage());
                if (this.fCurrentConnection != null) {
                    try {
                        this.fCurrentConnection.close();
                    }
                    catch (SQLException e1) {
                        log.warn("While closing a connection (during an exception), a SQL Exception: " + e.getMessage());
                    }
                    this.fCurrentConnection = null;
                }
                context.getStreamProcessingContext().fail(e.getMessage());
            }
        }
    }

    private void sendToDb() throws SQLException {
        if (this.fPending != null) {
            try {
                this.fPending.executeBatch();
                this.fPending.close();
                this.fPending = null;
            }
            catch (SQLException x) {
                log.warn(x.getMessage(), (Throwable)x);
                if (this.fCurrentConnection != null) {
                    this.fCurrentConnection.close();
                    this.fCurrentConnection = null;
                }
                throw x;
            }
        }
        this.fRecordCount += this.fPendingCount;
        log.info("Posted {} records, total {}.", (Object)this.fPendingCount, (Object)this.fRecordCount);
        this.fPendingCount = 0L;
    }

    private void buildColumnValue(ColInfo ci, String val, int param) throws SQLException {
        block19: {
            try {
                Class<?> targetClass = ci.getTargetClass();
                if (targetClass.equals(Integer.class)) {
                    int valToUse = val == null || val.length() == 0 ? 0 : Integer.parseInt(val);
                    this.fPending.setInt(param, valToUse);
                    break block19;
                }
                if (targetClass.equals(Long.class)) {
                    this.fPending.setLong(param, Long.parseLong(val));
                    break block19;
                }
                if (targetClass.equals(Double.class)) {
                    double valToUse = 0.0;
                    if (val.trim().length() == 0) {
                        valToUse = 0.0;
                    } else {
                        double d = Double.parseDouble(val);
                        double r = ci.getRounding();
                        if (r != 1.0) {
                            d = (double)Math.round(d * r) / r;
                        }
                        valToUse = !Double.isFinite(d) ? -1.0 : d;
                    }
                    this.fPending.setDouble(param, valToUse);
                    break block19;
                }
                if (targetClass.equals(Boolean.class)) {
                    this.fPending.setString(param, TypeConvertor.convertToBooleanBroad((String)val) ? "Y" : "N");
                    break block19;
                }
                if (targetClass.equals(java.util.Date.class)) {
                    Date d = null;
                    String fmt = ci.getFormat();
                    if (fmt != null && fmt.equals("#")) {
                        try {
                            long dateval = Long.parseLong(val);
                            d = new Date(dateval);
                        }
                        catch (NumberFormatException dateval) {}
                    } else {
                        try {
                            SimpleDateFormat sdf = new SimpleDateFormat(fmt);
                            java.util.Date dd = sdf.parse(val);
                            d = new Date(dd.getTime());
                        }
                        catch (ParseException parseException) {
                            // empty catch block
                        }
                    }
                    this.fPending.setDate(param, d);
                    break block19;
                }
                if (targetClass.equals(Timestamp.class)) {
                    this.fPending.setTimestamp(param, new Timestamp(Long.parseLong(val)));
                } else if (val != null) {
                    this.fPending.setString(param, val);
                }
            }
            catch (NumberFormatException e) {
                this.fPending.setString(param, val);
            }
        }
    }

    private static class ColInfo {
        private final String fKey;
        private final String fExpr;
        private final Class<?> fClass;
        private final String fFormat;
        private final double fRounding;

        public ColInfo(String key, String expr, Class<?> clazz, String fmt, double rounding) {
            this.fKey = key;
            this.fExpr = expr;
            this.fClass = clazz;
            this.fFormat = fmt;
            this.fRounding = rounding;
        }

        public ColInfo(JSONObject data) {
            this(data.getString("key"), data.optString("expr", "${" + data.getString("key") + "}"), ColInfo.classFrom(data.optString("type", null)), data.optString("format", null), data.optDouble("rounding", 1.0));
        }

        public String toString() {
            return this.fKey + " from " + this.fExpr + " as " + this.fClass.getSimpleName();
        }

        public String getKey() {
            return this.fKey;
        }

        public String getExpr() {
            return this.fExpr;
        }

        public Class<?> getTargetClass() {
            return this.fClass;
        }

        public String getFormat() {
            return this.fFormat;
        }

        public double getRounding() {
            return this.fRounding;
        }

        private static Class<?> classFrom(String text) {
            if (text == null || text.length() == 0) {
                return String.class;
            }
            if ((text = text.trim().toLowerCase()).startsWith("int")) {
                return Integer.class;
            }
            if (text.startsWith("long")) {
                return Long.class;
            }
            if (text.startsWith("double")) {
                return Double.class;
            }
            if (text.startsWith("bool")) {
                return Boolean.class;
            }
            if (text.startsWith("date")) {
                return java.util.Date.class;
            }
            if (text.startsWith("timestamp")) {
                return Timestamp.class;
            }
            return String.class;
        }
    }
}

