/*
 * Decompiled with CFR 0.152.
 */
package sql.tpce.tpcetxn;

import hydra.Log;
import hydra.RemoteTestModule;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import sql.SQLHelper;
import sql.tpce.TPCEBB;
import sql.tpce.TPCETest;
import sql.tpce.entity.TradeInfo;
import sql.tpce.tpcedef.TPCETxnInput;
import sql.tpce.tpcedef.TPCETxnOutput;
import sql.tpce.tpcedef.generator.MEE;
import sql.tpce.tpcedef.input.MarketFeedTxnInput;
import sql.tpce.tpcedef.output.MarketFeedTxnOutput;
import sql.tpce.tpcetxn.TPCETransaction;
import util.TestException;
import util.TestHelper;

public class TPCEMarketFeed
extends TPCETransaction {
    protected MarketFeedTxnInput mfTxnInput = null;
    protected MarketFeedTxnOutput mfTxnOutput = null;
    protected Connection conn = null;
    private int num_updated;
    private int send_len;
    private BigDecimal[] price_quote;
    private int[] trade_qty;
    private String[] symbol;
    private int max_feed_len;
    private String type_stop_loss;
    private String type_limit_sell;
    private String type_limit_buy;
    private String status_submitted;
    private static String updateLastTrade = "update LAST_TRADE set LT_PRICE = ?, LT_VOL = LT_VOL + ?, LT_DTS = ? where LT_S_SYMB = ?";
    private static String selectTradeRequestForUpdate = "select TR_T_ID, TR_BID_PRICE, TR_TT_ID, TR_QTY from TRADE_REQUEST -- GEMFIREXD-PROPERTIES index=i_tr_s_symb \n where TR_S_SYMB = ? and ((TR_TT_ID = ? and TR_BID_PRICE >= ?) or (TR_TT_ID = ? and TR_BID_PRICE <= ?) or (TR_TT_ID = ? and TR_BID_PRICE >= ?)) for update";
    private static String updateTrade = "update TRADE set T_DTS = ?, T_ST_ID = ? where T_ID = ?";
    private static String deleteTradeRequest = "delete from TRADE_REQUEST where TR_T_ID = ?";
    private static final String insertTradeHistory = "insert into TRADE_HISTORY(TH_T_ID, TH_DTS, TH_ST_ID) values (?, ?, ?)";

    @Override
    public TPCETxnOutput runTxn(TPCETxnInput txnInput, Connection conn) throws SQLException {
        this.mfTxnInput = (MarketFeedTxnInput)txnInput;
        this.mfTxnOutput = new MarketFeedTxnOutput();
        this.conn = conn;
        MEE mee = new MEE();
        this.price_quote = this.mfTxnInput.getPriceQuotes();
        this.symbol = this.mfTxnInput.getSymbol();
        this.trade_qty = this.mfTxnInput.getTradeQty();
        this.type_limit_buy = this.mfTxnInput.getLimitBuy();
        this.type_limit_sell = this.mfTxnInput.getLimitSell();
        this.type_stop_loss = this.mfTxnInput.getStopLoss();
        this.status_submitted = this.mfTxnInput.getStatusSubmitted();
        this.max_feed_len = this.symbol.length;
        if (logDML) {
            StringBuilder symbols = new StringBuilder();
            for (String symb : this.symbol) {
                symbols.append(symb);
            }
            Log.getLogWriter().info("this.symbol is assigned " + symbols.toString());
        }
        this.invokeFrame1(mee);
        if (this.num_updated < this.symbol.length) {
            this.mfTxnOutput.setStatus(-311);
            if (logDML) {
                throw new TestException("Update last_trade should work as no symbol was deleted in the test run but for " + this.symbol.length + " of unique symbols, " + "gfxd only updates " + this.num_updated + " in last_trade");
            }
        }
        this.mfTxnOutput.setNumUpdated(this.num_updated);
        this.mfTxnOutput.setSendLen(this.send_len);
        return this.mfTxnOutput;
    }

    protected void invokeFrame1(MEE mee) throws SQLException {
        int rows_updated = 0;
        PreparedStatement updateLastTradePs = this.conn.prepareStatement(updateLastTrade);
        PreparedStatement updateTradePs = this.conn.prepareStatement(updateTrade);
        PreparedStatement selectTrPs = this.conn.prepareStatement(selectTradeRequestForUpdate);
        PreparedStatement deleteTradeRequestPs = this.conn.prepareStatement(deleteTradeRequest);
        PreparedStatement insertTradeHistoryPs = this.conn.prepareStatement(insertTradeHistory);
        for (int i = 0; i < this.max_feed_len; ++i) {
            boolean retry = true;
            while (retry) {
                try {
                    int rows_updated_thisTicker = this.processTxnForSymbol(i, mee, updateLastTradePs, updateTradePs, selectTrPs, deleteTradeRequestPs, insertTradeHistoryPs);
                    retry = false;
                    rows_updated += rows_updated_thisTicker;
                }
                catch (SQLException se) {
                    if (se.getSQLState().equals("X0Z02")) {
                        if (!logDML) continue;
                        Log.getLogWriter().info("will retry this op due to conflict exception X0Z02");
                        continue;
                    }
                    SQLHelper.handleSQLException(se);
                }
            }
        }
        this.num_updated = rows_updated;
    }

    protected int processTxnForSymbol(int i, MEE mee, PreparedStatement updateLastTradePs, PreparedStatement updateTradePs, PreparedStatement selectTrPs, PreparedStatement deleteTradeRequestPs, PreparedStatement insertTradeHistoryPs) throws SQLException {
        ArrayList<TradeInfo> tradeRequest = new ArrayList<TradeInfo>();
        int rows_updated_thisTicker = 0;
        Timestamp now_dts = new Timestamp(System.currentTimeMillis());
        updateLastTradePs.setBigDecimal(1, this.price_quote[i]);
        updateLastTradePs.setInt(2, this.trade_qty[i]);
        updateLastTradePs.setTimestamp(3, now_dts);
        updateLastTradePs.setString(4, this.symbol[i]);
        int count = updateLastTradePs.executeUpdate();
        if (logDML) {
            Log.getLogWriter().info(updateLastTrade + " with LT_PRICE = " + this.price_quote[i] + " LT_VOL = LT_VOL + " + this.trade_qty[i] + " LT_DTS - " + now_dts + " for LT_S_SYMB = " + this.symbol[i]);
        }
        if (count != 1) {
            throw new TestException(updateLastTrade + " with LT_PRICE = " + this.price_quote[i] + " LT_VOL = LT_VOL + " + this.trade_qty[i] + " LT_DTS - " + now_dts + " for LT_S_SYMB = " + this.symbol[i] + " should update 1 row " + "but updated " + count + " row(s)");
        }
        rows_updated_thisTicker += count;
        this.processSelectTrForUpdate(i, updateTradePs, selectTrPs, now_dts, deleteTradeRequestPs, insertTradeHistoryPs, tradeRequest);
        this.conn.commit();
        if (TPCETest.logDML) {
            Log.getLogWriter().info("committed market_feed_txn");
        }
        this.send_len += tradeRequest.size();
        if (RemoteTestModule.getCurrentThread().getCurrentTask().getTaskTypeString().equalsIgnoreCase("INITTASK")) {
            for (TradeInfo tr : tradeRequest) {
                TPCEBB.getBB().addTradeId(tr.getTradeId());
                TPCEBB.getBB().getSharedCounters().increment(TPCEBB.TradeIdsInsertedInInitTask);
                Log.getLogWriter().info("TPCEBB adds " + tr.getTradeId());
            }
        } else {
            mee.submitTradeToMarket(this.conn, tradeRequest);
        }
        return rows_updated_thisTicker;
    }

    protected void processSelectTrForUpdate(int i, PreparedStatement updateTradePs, PreparedStatement selectTrPs, Timestamp now_dts, PreparedStatement deleteTradeRequestPs, PreparedStatement insertTradeHistoryPs, ArrayList<TradeInfo> tradeRequest) throws SQLException {
        try {
            selectTrPs.setString(1, this.symbol[i]);
            selectTrPs.setString(2, this.type_stop_loss);
            selectTrPs.setBigDecimal(3, this.price_quote[i]);
            selectTrPs.setString(4, this.type_limit_sell);
            selectTrPs.setBigDecimal(5, this.price_quote[i]);
            selectTrPs.setString(6, this.type_limit_buy);
            selectTrPs.setBigDecimal(7, this.price_quote[i]);
            ResultSet rs = selectTrPs.executeQuery();
            while (rs.next()) {
                long req_trade_id = rs.getLong("TR_T_ID");
                BigDecimal req_price_quote = rs.getBigDecimal("TR_BID_PRICE");
                String req_trade_type = rs.getString("TR_TT_ID");
                int req_trade_qty = rs.getInt("TR_QTY");
                this.updateTradeTxn(updateTradePs, now_dts, req_trade_id);
                this.deleteTradeRequestTxn(deleteTradeRequestPs, req_trade_id);
                this.insertTradeHistoryTxn(insertTradeHistoryPs, req_trade_id, now_dts);
                TradeInfo tr = new TradeInfo();
                tr.setPriceQuotes(req_price_quote);
                tr.setSymbol(this.symbol[i]);
                tr.setTradeId(req_trade_id);
                tr.setTradeQty(req_trade_qty);
                tr.setTradeType(req_trade_type);
                tradeRequest.add(tr);
            }
        }
        catch (SQLException se) {
            if (se.getSQLState().equals("X0Z02") && (!TPCETest.isClient || TPCETest.isClient && TPCETest.useSyncCommit)) {
                throw new TestException("Does not expect conflict exception here, as the the txn already hold the lock for the symbol updated in last_trade table, there should be no other txn could modify the same row for symbol " + this.symbol[i] + " in trade request table " + " the stack trace for the conflict exception is: " + TestHelper.getStackTrace((Throwable)se));
            }
            throw se;
        }
    }

    protected void updateTradeTxn(PreparedStatement updateTradePs, Timestamp now_dts, long req_trade_id) throws SQLException {
        try {
            updateTradePs.setTimestamp(1, now_dts);
            updateTradePs.setString(2, this.status_submitted);
            updateTradePs.setLong(3, req_trade_id);
            int count = updateTradePs.executeUpdate();
            if (logDML) {
                Log.getLogWriter().info(updateTrade + " with T_DTS = " + now_dts + " T_ST_ID = " + this.status_submitted + " for T_ID = " + req_trade_id);
            }
            if (count != 1) {
                throw new TestException(updateTrade + " with T_DTS = " + now_dts + " T_ST_ID = + " + this.status_submitted + " for T_ID = " + req_trade_id + " should update 1 row but updated " + count + " row(s)");
            }
        }
        catch (SQLException se) {
            if (se.getSQLState().equals("X0Z02") && (!TPCETest.isClient || TPCETest.isClient && TPCETest.useSyncCommit)) {
                throw new TestException("Does not expect conflict exception here, as the the trade table is only updated in market feed txn. No other txn should be able to modify the trade_request table with the same symbol, as the trade is inserted in trade order txn (if not committed, there should not be conflict) or trade is updated in trade result txn but only for status became submitted, or trade is updated in trade update txn (test will make sure update only on submitted status when txnInput is configured in trade update txn}.The stack trace for the conflict exception is: " + TestHelper.getStackTrace((Throwable)se));
            }
            throw se;
        }
    }

    protected void deleteTradeRequestTxn(PreparedStatement ps, long req_trade_id) throws SQLException {
        try {
            ps.setLong(1, req_trade_id);
            int count = ps.executeUpdate();
            if (logDML) {
                Log.getLogWriter().info(deleteTradeRequest + " for TR_T_ID = " + req_trade_id);
            }
            if (count != 1) {
                throw new TestException(deleteTradeRequest + " for TR_T_ID = " + req_trade_id + " should delete 1 row but deleted " + count + " row(s)");
            }
        }
        catch (SQLException se) {
            if (se.getSQLState().equals("X0Z02") && (!TPCETest.isClient || TPCETest.isClient && TPCETest.useSyncCommit)) {
                throw new TestException("Does not expect conflict exception here, as the lock should be held by the select for update statementThe stack trace for the conflict exception is: " + TestHelper.getStackTrace((Throwable)se));
            }
            throw se;
        }
    }

    protected void insertTradeHistoryTxn(PreparedStatement ps, long req_trade_id, Timestamp now_dts) throws SQLException {
        try {
            ps = this.conn.prepareStatement(insertTradeHistory);
            ps.setLong(1, req_trade_id);
            ps.setTimestamp(2, now_dts);
            ps.setString(3, this.status_submitted);
            int count = ps.executeUpdate();
            if (logDML) {
                Log.getLogWriter().info("insert into TRADE_HISTORY(TH_T_ID, TH_DTS, TH_ST_ID) values (?, ?, ?) with TH_T_ID = " + req_trade_id + " TH_DTS = " + now_dts + " TH_ST_ID = " + this.status_submitted);
            }
            if (count != 1) {
                throw new TestException("insert into TRADE_HISTORY(TH_T_ID, TH_DTS, TH_ST_ID) values (?, ?, ?) with TH_T_ID = " + req_trade_id + " TH_DTS = " + now_dts + " TH_ST_ID = " + this.status_submitted + " should inserts 1 row " + "but inserted " + count + " row(s)");
            }
        }
        catch (SQLException se) {
            if (se.getSQLState().equals("X0Z02")) {
                throw new TestException("Does not expect conflict exception here, as the lock should be held by the select for update statementThe stack trace for the conflict exception is: " + TestHelper.getStackTrace((Throwable)se));
            }
            throw se;
        }
    }
}

