/*
 * Decompiled with CFR 0.152.
 */
package sql.hdfs;

import com.pivotal.gemfirexd.hadoop.mapred.Key;
import com.pivotal.gemfirexd.hadoop.mapred.Row;
import com.pivotal.gemfirexd.hadoop.mapred.RowInputFormat;
import com.pivotal.gemfirexd.hadoop.mapred.RowOutputFormat;
import com.pivotal.gemfirexd.internal.engine.GfxdDataSerializable;
import hydra.Log;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class VerifyHdfsDataUsingMR
extends Configured
implements Tool {
    public int run(String[] args) throws Exception {
        GfxdDataSerializable.initTypes();
        JobConf conf = new JobConf(this.getConf());
        conf.setJobName("hdfsMapReduce");
        String hdfsHomeDir = args[0];
        String url = args[1];
        String tableName = args[2];
        System.out.println("VerifyHdfsData.run() invoked with  hdfsHomeDir = " + hdfsHomeDir + " url = " + url + " tableName = " + tableName);
        conf.set("gfxd.input.homedir", hdfsHomeDir);
        conf.set("gfxd.input.tablename", tableName);
        conf.setBoolean("gfxd.input.checkpointmode", false);
        conf.setInputFormat(RowInputFormat.class);
        conf.setMapperClass(HdfsDataMapper.class);
        conf.setMapOutputKeyClass(Text.class);
        conf.setMapOutputValueClass(MyRow.class);
        conf.setReducerClass(HdfsDataReducer.class);
        conf.set("gfxd.output.tablename", "TRADE.HDFS_CUSTOMERS");
        conf.set("gfxd.output.dburl", url);
        conf.setOutputFormat(RowOutputFormat.class);
        conf.setOutputKeyClass(Key.class);
        conf.setOutputValueClass(DataObject.class);
        StringBuffer aStr = new StringBuffer();
        aStr.append("HOME_DIR = " + conf.get("gfxd.input.homedir") + " ");
        aStr.append("INPUT_TABLE = " + conf.get("gfxd.input.tablename") + " ");
        aStr.append("OUTPUT_TABLE = " + conf.get("gfxd.output.tablename") + " ");
        aStr.append("OUTPUT_URL = " + conf.get("gfxd.output.dburl") + " ");
        System.out.println("VerifyHdfsData running with the following conf: " + aStr.toString());
        FileOutputFormat.setOutputPath((JobConf)conf, (Path)new Path("" + System.currentTimeMillis()));
        JobClient.runJob((JobConf)conf);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        System.out.println("VerifyHdfsData.main() invoked with " + args);
        int rc = ToolRunner.run((Tool)new VerifyHdfsDataUsingMR(), (String[])args);
        System.exit(rc);
    }

    public static class DataObject {
        int cid;
        int tid;
        String cname;
        String addr;

        public DataObject(int cid, String cname, String addr, int tid) {
            this.cid = cid;
            this.cname = cname;
            this.addr = addr;
            this.tid = tid;
        }

        public int getCid() {
            return this.cid;
        }

        public void setCid(int cid) {
            this.cid = cid;
        }

        public int getTid() {
            return this.tid;
        }

        public void setTid(int tid) {
            this.tid = tid;
        }

        public String getCname() {
            return this.cname;
        }

        public void setCname(String cname) {
            this.cname = cname;
        }

        public String getAddr() {
            return this.addr;
        }

        public void setAddr(String addr) {
            this.addr = addr;
        }
    }

    public static class MyRow
    implements Writable {
        private int cid;
        private int tid;
        private String addr;
        private String cname;

        public MyRow(int cid, String cname, String addr, int tid) {
            this.cid = cid;
            this.cname = cname;
            this.addr = addr;
            this.tid = tid;
        }

        public int getCid() {
            return this.cid;
        }

        public void setCid(int cid) {
            this.cid = cid;
        }

        public int getTid() {
            return this.tid;
        }

        public void setTid(int tid) {
            this.tid = tid;
        }

        public String getAddr() {
            return this.addr;
        }

        public void setAddr(String addr) {
            this.addr = addr;
        }

        public String getCname() {
            return this.cname;
        }

        public void setCname(String cname) {
            this.cname = cname;
        }

        public void write(DataOutput out) throws IOException {
            out.writeInt(this.cid);
            out.writeChars(this.cname);
            out.writeChars(this.addr);
            out.writeInt(this.tid);
        }

        public void readFields(DataInput in) throws IOException {
            this.cid = in.readInt();
            this.cname = in.readLine();
            this.addr = in.readLine();
            this.tid = in.readInt();
        }
    }

    public static class HdfsDataReducer
    extends MapReduceBase
    implements Reducer<Text, MyRow, Key, DataObject> {
        public void reduce(Text key, Iterator<MyRow> values, OutputCollector<Key, DataObject> output, Reporter reporter) throws IOException {
            Log.getLogWriter().info("i am in reducer");
            while (values.hasNext()) {
                MyRow myRow = values.next();
                int cid = myRow.getCid();
                int tid = myRow.getTid();
                String addr = myRow.getAddr();
                String cname = myRow.getCname();
                Log.getLogWriter().info("reducer processing record for " + myRow.toString());
                DataObject o = new DataObject(cid, cname, addr, tid);
                Log.getLogWriter().info("reducer writing record " + o.toString());
                output.collect((Object)new Key(), (Object)o);
            }
            Log.getLogWriter().info("i am in out from reducer");
        }
    }

    public static class HdfsDataMapper
    extends MapReduceBase
    implements Mapper<Key, Row, Text, MyRow> {
        public void map(Key key, Row value, OutputCollector<Text, MyRow> output, Reporter reporter) throws IOException {
            String tableName = null;
            try {
                ResultSet rs = value.getRowAsResultSet();
                tableName = rs.getMetaData().getTableName(1);
                Log.getLogWriter().info("i am in a mapper and table Name is " + tableName);
                int cid = rs.getInt("cid");
                String cname = rs.getString("cust_name");
                String addr = rs.getString("addr");
                int tid = rs.getInt("tid");
                Log.getLogWriter().info("mapper procesing record from " + tableName + ": " + cid + ": " + cname + ": " + addr + ": " + tid);
                Text myKey = new Text(Integer.toString(cid));
                MyRow myRow = new MyRow(cid, cname, addr, tid);
                Log.getLogWriter().info("MAPPER writing intermediate record " + myRow.toString());
                output.collect((Object)myKey, (Object)myRow);
            }
            catch (SQLException se) {
                System.err.println("Error logging result set" + se);
            }
        }
    }
}

