package com.twitter.hraven.etl;

import com.twitter.hraven.Constants;
import com.twitter.hraven.datasource.ProcessingException;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.TimeZone;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.SequenceFile;

/* loaded from: input_file:com/twitter/hraven/etl/ProcessRecordService.class */
public class ProcessRecordService {
    public static final SimpleDateFormat MILLISECOND_TIMSTAMP_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    private static Log LOG;
    private ProcessRecordKeyConverter keyConv = new ProcessRecordKeyConverter();
    private final HTable processRecordTable;
    private final Configuration myHBaseConf;
    private final FileSystem fs;

    public ProcessRecordService(Configuration configuration) throws IOException {
        this.processRecordTable = new HTable(configuration, Constants.JOB_FILE_PROCESS_TABLE_BYTES);
        this.myHBaseConf = configuration;
        this.fs = FileSystem.get(configuration);
    }

    public void writeJobRecord(ProcessRecord processRecord) throws IOException {
        Put put = new Put(this.keyConv.toBytes(processRecord.getKey()));
        put.add(Constants.INFO_FAM_BYTES, Constants.MIN_MOD_TIME_MILLIS_COLUMN_BYTES, Bytes.toBytes(processRecord.getMinModificationTimeMillis()));
        put.add(Constants.INFO_FAM_BYTES, Constants.PROCESSED_JOB_FILES_COLUMN_BYTES, Bytes.toBytes(processRecord.getProcessedJobFiles()));
        put.add(Constants.INFO_FAM_BYTES, Constants.PROCESS_FILE_COLUMN_BYTES, Bytes.toBytes(processRecord.getProcessFile()));
        put.add(Constants.INFO_FAM_BYTES, Constants.PROCESSING_STATE_COLUMN_BYTES, Bytes.toBytes(processRecord.getProcessState().getCode()));
        put.add(Constants.INFO_FAM_BYTES, Constants.MIN_JOB_ID_COLUMN_BYTES, Bytes.toBytes(processRecord.getMinJobId()));
        put.add(Constants.INFO_FAM_BYTES, Constants.MAX_JOB_ID_COLUMN_BYTES, Bytes.toBytes(processRecord.getMaxJobId()));
        this.processRecordTable.put(put);
    }

    public ProcessRecord getLastSuccessfulProcessRecord(String str) throws IOException {
        return getLastSuccessfulProcessRecord(str, null);
    }

    public ProcessRecord getLastSuccessfulProcessRecord(String str, String str2) throws IOException {
        List<ProcessRecord> processRecords = getProcessRecords(str, CompareFilter.CompareOp.NOT_EQUAL, ProcessState.CREATED, 1, str2);
        if (processRecords.size() > 0) {
            return processRecords.get(0);
        }
        return null;
    }

    public List<ProcessRecord> getProcessRecords(String str, int i, String str2) throws IOException {
        return getProcessRecords(str, CompareFilter.CompareOp.NO_OP, null, i, str2);
    }

    public List<ProcessRecord> getProcessRecords(String str, ProcessState processState, int i, String str2) throws IOException {
        return getProcessRecords(str, CompareFilter.CompareOp.EQUAL, processState, i, str2);
    }

    public List<ProcessRecord> getProcessRecords(String str, CompareFilter.CompareOp compareOp, ProcessState processState, int i, String str2) throws IOException {
        Scan scan = new Scan();
        scan.setStartRow(this.keyConv.toBytes(new ProcessRecordKey(str, Long.MAX_VALUE)));
        scan.setStopRow(this.keyConv.toBytes(new ProcessRecordKey(str, 0L)));
        scan.addColumn(Constants.INFO_FAM_BYTES, Constants.MIN_MOD_TIME_MILLIS_COLUMN_BYTES);
        scan.addColumn(Constants.INFO_FAM_BYTES, Constants.PROCESSED_JOB_FILES_COLUMN_BYTES);
        scan.addColumn(Constants.INFO_FAM_BYTES, Constants.PROCESS_FILE_COLUMN_BYTES);
        scan.addColumn(Constants.INFO_FAM_BYTES, Constants.PROCESSING_STATE_COLUMN_BYTES);
        scan.addColumn(Constants.INFO_FAM_BYTES, Constants.MIN_JOB_ID_COLUMN_BYTES);
        scan.addColumn(Constants.INFO_FAM_BYTES, Constants.MAX_JOB_ID_COLUMN_BYTES);
        scan.setMaxVersions(1);
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        if (!CompareFilter.CompareOp.NO_OP.equals(compareOp)) {
            filterList.addFilter(new SingleColumnValueFilter(Constants.INFO_FAM_BYTES, Constants.PROCESSING_STATE_COLUMN_BYTES, compareOp, Bytes.toBytes(processState.getCode())));
        }
        if (str2 != null && str2.length() > 0) {
            filterList.addFilter(new SingleColumnValueFilter(Constants.INFO_FAM_BYTES, Constants.PROCESS_FILE_COLUMN_BYTES, CompareFilter.CompareOp.EQUAL, new SubstringComparator(str2)));
        }
        if (filterList.getFilters().size() > 0) {
            scan.setFilter(filterList);
        }
        ResultScanner resultScanner = null;
        try {
            resultScanner = this.processRecordTable.getScanner(scan);
            List<ProcessRecord> createFromResults = createFromResults(resultScanner, i);
            if (resultScanner != null) {
                resultScanner.close();
            }
            return createFromResults;
        } catch (Throwable th) {
            if (resultScanner != null) {
                resultScanner.close();
            }
            throw th;
        }
    }

    private List<ProcessRecord> createFromResults(ResultScanner resultScanner, int i) {
        if (i <= 0 || resultScanner == null) {
            return new ArrayList(0);
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = resultScanner.iterator();
        while (it.hasNext()) {
            Result result = (Result) it.next();
            ProcessRecordKey m16fromBytes = this.keyConv.m16fromBytes(result.getRow());
            long j = Bytes.toLong(result.getColumnLatest(Constants.INFO_FAM_BYTES, Constants.MIN_MOD_TIME_MILLIS_COLUMN_BYTES).getValue());
            int i2 = Bytes.toInt(result.getColumnLatest(Constants.INFO_FAM_BYTES, Constants.PROCESSED_JOB_FILES_COLUMN_BYTES).getValue());
            String bytes = Bytes.toString(result.getColumnLatest(Constants.INFO_FAM_BYTES, Constants.PROCESS_FILE_COLUMN_BYTES).getValue());
            ProcessState processState = ProcessState.getProcessState(Bytes.toInt(result.getColumnLatest(Constants.INFO_FAM_BYTES, Constants.PROCESSING_STATE_COLUMN_BYTES).getValue()));
            KeyValue columnLatest = result.getColumnLatest(Constants.INFO_FAM_BYTES, Constants.MIN_JOB_ID_COLUMN_BYTES);
            String str = null;
            if (columnLatest != null) {
                str = Bytes.toString(columnLatest.getValue());
            }
            KeyValue columnLatest2 = result.getColumnLatest(Constants.INFO_FAM_BYTES, Constants.MAX_JOB_ID_COLUMN_BYTES);
            String str2 = null;
            if (columnLatest2 != null) {
                str2 = Bytes.toString(columnLatest2.getValue());
            }
            arrayList.add(new ProcessRecord(m16fromBytes.getCluster(), processState, j, m16fromBytes.getTimestamp(), i2, bytes, str, str2));
            if (arrayList.size() >= i) {
                break;
            }
        }
        LOG.info("Returning " + arrayList.size() + " process records");
        return arrayList;
    }

    public ProcessRecord setProcessState(ProcessRecord processRecord, ProcessState processState) throws IOException {
        Put put = new Put(this.keyConv.toBytes(processRecord.getKey()));
        put.add(Constants.INFO_FAM_BYTES, Constants.PROCESSING_STATE_COLUMN_BYTES, Bytes.toBytes(processState.getCode()));
        this.processRecordTable.put(put);
        return new ProcessRecord(processRecord.getCluster(), processState, processRecord.getMinModificationTimeMillis(), processRecord.getMaxModificationTimeMillis(), processRecord.getProcessedJobFiles(), processRecord.getProcessFile(), processRecord.getMinJobId(), processRecord.getMaxJobId());
    }

    long getEndOfDayMillis(String str, String str2, String str3) {
        String str4 = str + "-" + str2 + "-" + str3 + " 23:59:59.999";
        try {
            return MILLISECOND_TIMSTAMP_FORMAT.parse(str4).getTime();
        } catch (ParseException e) {
            throw new IllegalArgumentException("Cannot parse: " + str4);
        }
    }

    long getStartOfDayMillis(String str, String str2, String str3) {
        String str4 = str + "-" + str2 + "-" + str3 + " 00:00:00.000";
        try {
            return MILLISECOND_TIMSTAMP_FORMAT.parse(str4).getTime();
        } catch (ParseException e) {
            throw new IllegalArgumentException("Cannot parse: " + str4);
        }
    }

    public void close() throws IOException {
        if (this.processRecordTable != null) {
            this.processRecordTable.close();
        }
    }

    public Path getInitialProcessFile(String str, int i) {
        return new Path(new Path("/tmp"), "hraven-" + (str != null ? str.replaceAll("\\W+", "") : "") + "-" + Constants.TIMESTAMP_FORMAT.format(new Date(System.currentTimeMillis())) + "-" + i);
    }

    public SequenceFile.Writer createProcessFileWriter(Path path) throws IOException {
        return SequenceFile.createWriter(this.fs, this.myHBaseConf, path, JobFile.class, FileStatus.class);
    }

    public Path moveProcessFile(Path path, Path path2) throws IOException {
        Path path3 = new Path(path2, path.getName());
        if (this.fs.rename(path, path3)) {
            return path3;
        }
        throw new ProcessingException("Unable to move processing file " + path + " to " + path3);
    }

    static {
        MILLISECOND_TIMSTAMP_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
        LOG = LogFactory.getLog(ProcessRecordService.class);
    }
}
