package com.twitter.hraven.datasource;

import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.AtomicDouble;
import com.twitter.hraven.Constants;
import com.twitter.hraven.Counter;
import com.twitter.hraven.CounterMap;
import com.twitter.hraven.Flow;
import com.twitter.hraven.FlowKey;
import com.twitter.hraven.HravenResponseMetrics;
import com.twitter.hraven.JobDesc;
import com.twitter.hraven.JobDetails;
import com.twitter.hraven.JobHistoryKeys;
import com.twitter.hraven.JobKey;
import com.twitter.hraven.QualifiedJobId;
import com.twitter.hraven.TaskDetails;
import com.twitter.hraven.TaskKey;
import com.twitter.hraven.util.ByteUtil;
import com.twitter.hraven.util.HadoopConfUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.util.Bytes;

/* loaded from: input_file:com/twitter/hraven/datasource/JobHistoryService.class */
public class JobHistoryService {
    private static Log LOG = LogFactory.getLog(JobHistoryService.class);
    private final Connection hbaseConnection;
    private final JobHistoryByIdService idService;
    private final JobKeyConverter jobKeyConv = new JobKeyConverter();
    private final TaskKeyConverter taskKeyConv = new TaskKeyConverter();
    private final int defaultScannerCaching;

    public JobHistoryService(Configuration configuration, Connection connection) throws IOException {
        this.hbaseConnection = connection;
        this.idService = new JobHistoryByIdService(connection);
        this.defaultScannerCaching = configuration.getInt("hbase.client.scanner.caching", 100);
    }

    public Flow getLatestFlow(String str, String str2, String str3) throws IOException {
        return getLatestFlow(str, str2, str3, false);
    }

    public Flow getLatestFlow(String str, String str2, String str3, boolean z) throws IOException {
        List<Flow> flowSeries = getFlowSeries(str, str2, str3, null, z, 1);
        if (flowSeries.size() > 0) {
            return flowSeries.get(0);
        }
        return null;
    }

    public List<Flow> getFlowSeries(String str, String str2, String str3, int i) throws IOException {
        return getFlowSeries(str, str2, str3, null, false, i);
    }

    /* JADX WARN: Type inference failed for: r1v1, types: [byte[], byte[][]] */
    public Flow getFlow(String str, String str2, String str3, long j, boolean z) throws IOException {
        Flow flow = null;
        byte[] join = ByteUtil.join(Constants.SEP_BYTES, new byte[]{Bytes.toBytes(str), Bytes.toBytes(str2), Bytes.toBytes(str3), Bytes.toBytes(FlowKey.encodeRunId(j)), Constants.EMPTY_BYTES});
        LOG.info("Reading job_history rows start at " + Bytes.toStringBinary(join));
        Scan scan = new Scan();
        scan.setStartRow(join);
        scan.setFilter(new WhileMatchFilter(new PrefixFilter(join)));
        List<Flow> createFromResults = createFromResults(scan, z, 1);
        if (createFromResults.size() > 0) {
            flow = createFromResults.get(0);
        }
        return flow;
    }

    /* JADX WARN: Type inference failed for: r1v2, types: [byte[], byte[][]] */
    public Flow getFlowByJobID(String str, String str2, boolean z) throws IOException {
        Flow flow = null;
        JobKey jobKeyById = this.idService.getJobKeyById(new QualifiedJobId(str, str2));
        if (jobKeyById != null) {
            byte[] join = ByteUtil.join(Constants.SEP_BYTES, new byte[]{Bytes.toBytes(jobKeyById.getCluster()), Bytes.toBytes(jobKeyById.getUserName()), Bytes.toBytes(jobKeyById.getAppId()), Bytes.toBytes(jobKeyById.getEncodedRunId()), Constants.EMPTY_BYTES});
            LOG.info("Reading job_history rows start at " + Bytes.toStringBinary(join));
            Scan scan = new Scan();
            scan.setStartRow(join);
            scan.setFilter(new WhileMatchFilter(new PrefixFilter(join)));
            List<Flow> createFromResults = createFromResults(scan, z, 1);
            if (createFromResults.size() > 0) {
                flow = createFromResults.get(0);
            }
        }
        return flow;
    }

    private Scan createFlowScan(byte[] bArr, int i, String str) {
        Scan scan = new Scan();
        scan.setStartRow(bArr);
        scan.setCaching(Math.min(i, this.defaultScannerCaching));
        WhileMatchFilter whileMatchFilter = new WhileMatchFilter(new PrefixFilter(bArr));
        if (str == null || str.length() <= 0) {
            scan.setFilter(whileMatchFilter);
        } else {
            FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
            filterList.addFilter(whileMatchFilter);
            filterList.addFilter(new SingleColumnValueFilter(Constants.INFO_FAM_BYTES, Constants.VERSION_COLUMN_BYTES, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(str)));
            scan.setFilter(filterList);
        }
        return scan;
    }

    public List<Flow> getFlowSeries(String str, String str2, String str3, String str4, boolean z, int i) throws IOException {
        return createFromResults(createFlowScan(Bytes.toBytes(str + "!" + str2 + "!" + str3 + "!"), i, str4), z, i);
    }

    public List<Flow> getFlowSeries(String str, String str2, String str3, String str4, boolean z, long j, long j2, int i) throws IOException {
        byte[] bytes = Bytes.toBytes(str + "!" + str2 + "!" + str3 + "!");
        Scan createFlowScan = createFlowScan(bytes, i, str4);
        if (j2 != 0) {
            createFlowScan.setStartRow(Bytes.add(bytes, Bytes.toBytes(FlowKey.encodeRunId(j2)), Constants.SEP_BYTES));
        }
        if (j != 0) {
            createFlowScan.setStopRow(Bytes.add(bytes, Bytes.toBytes(FlowKey.encodeRunId(j)), Constants.SEP_BYTES));
        }
        return createFromResults(createFlowScan, z, i);
    }

    public List<Flow> getFlowTimeSeriesStats(String str, String str2, String str3, String str4, long j, long j2, int i, byte[] bArr) throws IOException {
        byte[] bytes = Bytes.toBytes(str + "!" + str2 + "!" + str3 + "!");
        byte[] add = bArr != null ? bArr : j2 != 0 ? Bytes.add(bytes, Bytes.toBytes(FlowKey.encodeRunId(j2)), Constants.SEP_BYTES) : bytes;
        Scan scan = new Scan();
        scan.setStartRow(add);
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        if (j != 0) {
            scan.setStopRow(Bytes.add(bytes, Bytes.toBytes(FlowKey.encodeRunId(j)), Constants.ZERO_SINGLE_BYTE));
        } else {
            filterList.addFilter(new WhileMatchFilter(new PrefixFilter(bytes)));
        }
        if (str4 != null && str4.length() > 0) {
            filterList.addFilter(new SingleColumnValueFilter(Constants.INFO_FAM_BYTES, Constants.VERSION_COLUMN_BYTES, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(str4)));
        }
        filterList.addFilter(new QualifierFilter(CompareFilter.CompareOp.NOT_EQUAL, new RegexStringComparator("^c\\!((?!queue).)*$")));
        scan.setFilter(filterList);
        LOG.info("scan : \n " + scan.toJSON() + " \n");
        return createFromResults(scan, false, i);
    }

    public JobDetails getJobByJobID(String str, String str2) throws IOException {
        return getJobByJobID(str, str2, false);
    }

    public JobDetails getJobByJobID(String str, String str2, boolean z) throws IOException {
        return getJobByJobID(new QualifiedJobId(str, str2), z);
    }

    public JobDetails getJobByJobID(QualifiedJobId qualifiedJobId, boolean z) throws IOException {
        JobDetails jobDetails = null;
        JobKey jobKeyById = this.idService.getJobKeyById(qualifiedJobId);
        if (jobKeyById != null) {
            byte[] bytes = this.jobKeyConv.toBytes(jobKeyById);
            Table table = this.hbaseConnection.getTable(TableName.valueOf(Constants.HISTORY_TABLE));
            Result result = table.get(new Get(bytes));
            table.close();
            if (result != null && !result.isEmpty()) {
                jobDetails = new JobDetails(jobKeyById);
                jobDetails.populate(result);
                if (z) {
                    populateTasks(jobDetails);
                }
            }
        }
        return jobDetails;
    }

    private List<Flow> createFromResults(Scan scan, boolean z, int i) throws IOException {
        ArrayList arrayList = new ArrayList();
        ResultScanner<Result> resultScanner = null;
        try {
            Stopwatch start = new Stopwatch().start();
            Stopwatch stopwatch = new Stopwatch();
            int i2 = 0;
            long j = 0;
            int i3 = 0;
            Table table = this.hbaseConnection.getTable(TableName.valueOf(Constants.HISTORY_TABLE));
            resultScanner = table.getScanner(scan);
            Flow flow = null;
            for (Result result : resultScanner) {
                if (result != null && !result.isEmpty()) {
                    i2++;
                    j += result.size();
                    JobKey fromBytes = this.jobKeyConv.fromBytes(result.getRow());
                    if (flow == null || !flow.contains(fromBytes) || fromBytes.getRunId() == 0) {
                        if (arrayList.size() >= i) {
                            break;
                        }
                        flow = new Flow(new FlowKey(fromBytes));
                        arrayList.add(flow);
                    }
                    stopwatch.start();
                    JobDetails jobDetails = new JobDetails(fromBytes);
                    jobDetails.populate(result);
                    flow.addJob(jobDetails);
                    i3++;
                    stopwatch.stop();
                }
            }
            table.close();
            start.stop();
            LOG.info("Fetched from hbase " + i2 + " rows, " + j + " columns, " + arrayList.size() + " flows and " + i3 + " jobs taking up 0 bytes ( " + (0 / 1048576.0d) + " atomic double: " + new AtomicDouble(0 / 1048576.0d) + ") MB, in total time of " + start + " with  " + stopwatch + " spent inJobDetails & Flow population");
            HravenResponseMetrics.FLOW_HBASE_RESULT_SIZE_VALUE.set(0 / 1048576.0d);
            if (resultScanner != null) {
                resultScanner.close();
            }
            if (z) {
                populateTasks(arrayList);
            }
            return arrayList;
        } catch (Throwable th) {
            if (resultScanner != null) {
                resultScanner.close();
            }
            throw th;
        }
    }

    private void populateTasks(List<Flow> list) throws IOException {
        TaskKey fromBytes;
        int compareTo;
        if (list == null || list.size() == 0) {
            return;
        }
        JobKey jobKey = null;
        Iterator<Flow> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            List<JobDetails> jobs = it.next().getJobs();
            if (jobs != null && jobs.size() > 0) {
                jobKey = jobs.get(0).getJobKey();
                break;
            }
        }
        if (jobKey == null) {
            LOG.info("No start job found for flows");
            return;
        }
        byte[] add = Bytes.add(this.jobKeyConv.toBytes(jobKey), Constants.SEP_BYTES);
        Scan scan = new Scan();
        scan.setStartRow(add);
        scan.setCaching(500);
        Table table = this.hbaseConnection.getTable(TableName.valueOf(Constants.HISTORY_TASK_TABLE));
        ResultScanner scanner = table.getScanner(scan);
        try {
            Result next = scanner.next();
            Iterator<Flow> it2 = list.iterator();
            while (it2.hasNext()) {
                for (JobDetails jobDetails : it2.next().getJobs()) {
                    while (next != null && !next.isEmpty() && (compareTo = jobDetails.getJobKey().compareTo((fromBytes = this.taskKeyConv.fromBytes(next.getRow())))) >= 0) {
                        if (compareTo <= 0) {
                            TaskDetails taskDetails = new TaskDetails(fromBytes);
                            taskDetails.populate(next.getFamilyMap(Constants.INFO_FAM_BYTES));
                            jobDetails.addTask(taskDetails);
                        }
                        next = scanner.next();
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Added " + jobDetails.getTasks().size() + " tasks to job " + jobDetails.getJobKey().toString());
                    }
                }
            }
        } finally {
            scanner.close();
            table.close();
        }
    }

    private void populateTasks(JobDetails jobDetails) throws IOException {
        Result result;
        Table table = this.hbaseConnection.getTable(TableName.valueOf(Constants.HISTORY_TASK_TABLE));
        ResultScanner scanner = table.getScanner(getTaskScan(jobDetails.getJobKey()));
        try {
            Iterator it = scanner.iterator();
            while (it.hasNext() && (result = (Result) it.next()) != null && !result.isEmpty()) {
                TaskDetails taskDetails = new TaskDetails(this.taskKeyConv.fromBytes(result.getRow()));
                taskDetails.populate(result.getFamilyMap(Constants.INFO_FAM_BYTES));
                jobDetails.addTask(taskDetails);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Added " + jobDetails.getTasks().size() + " tasks to job " + jobDetails.getJobKey().toString());
            }
        } finally {
            scanner.close();
            table.close();
        }
    }

    private Scan getTaskScan(JobKey jobKey) {
        byte[] add = Bytes.add(this.jobKeyConv.toBytes(jobKey), Constants.SEP_BYTES);
        Scan scan = new Scan();
        scan.setStartRow(add);
        scan.setFilter(new WhileMatchFilter(new PrefixFilter(add)));
        scan.setCaching(500);
        return scan;
    }

    public static Configuration parseConfiguration(Map<byte[], byte[]> map) {
        Configuration configuration = new Configuration(false);
        byte[] add = Bytes.add(Constants.JOB_CONF_COLUMN_PREFIX_BYTES, Constants.SEP_BYTES);
        for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
            byte[] key = entry.getKey();
            if (Bytes.startsWith(key, add) && key.length > add.length) {
                configuration.set(Bytes.toString(Bytes.tail(key, key.length - add.length)), Bytes.toString(entry.getValue()));
            }
        }
        return configuration;
    }

    public static CounterMap parseCounters(byte[] bArr, Map<byte[], byte[]> map) {
        CounterMap counterMap = new CounterMap();
        byte[] add = Bytes.add(bArr, Constants.SEP_BYTES);
        for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
            byte[] key = entry.getKey();
            if (Bytes.startsWith(key, add) && key.length > add.length) {
                byte[][] split = ByteUtil.split(Bytes.tail(key, key.length - add.length), Constants.SEP_BYTES);
                if (split.length != 2) {
                    throw new IllegalArgumentException("Malformed column qualifier for counter value: " + Bytes.toStringBinary(key));
                }
                counterMap.add(new Counter(Bytes.toString(split[0]), Bytes.toString(split[1]), Bytes.toLong(entry.getValue())));
            }
        }
        return counterMap;
    }

    static void setHravenQueueNamePut(Configuration configuration, Put put, JobKey jobKey, byte[] bArr) {
        String queueName = HadoopConfUtil.getQueueName(configuration);
        if (queueName.equalsIgnoreCase(Constants.DEFAULT_VALUE_QUEUENAME)) {
            queueName = jobKey.getUserName();
        }
        put.addColumn(Constants.INFO_FAM_BYTES, Bytes.add(bArr, Constants.HRAVEN_QUEUE_BYTES), Bytes.toBytes(queueName));
    }

    public static List<Put> getHbasePuts(JobDesc jobDesc, Configuration configuration) {
        LinkedList linkedList = new LinkedList();
        JobKey jobKey = new JobKey(jobDesc);
        Put put = new Put(new JobKeyConverter().toBytes(jobKey));
        put.addColumn(Constants.INFO_FAM_BYTES, Constants.VERSION_COLUMN_BYTES, Bytes.toBytes(jobDesc.getVersion()));
        put.addColumn(Constants.INFO_FAM_BYTES, Constants.FRAMEWORK_COLUMN_BYTES, Bytes.toBytes(jobDesc.getFramework().toString()));
        byte[] bytes = Bytes.toBytes("c!");
        Iterator it = configuration.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            put.addColumn(Constants.INFO_FAM_BYTES, Bytes.add(bytes, Bytes.toBytes((String) entry.getKey())), Bytes.toBytes((String) entry.getValue()));
        }
        setHravenQueueNamePut(configuration, put, jobKey, bytes);
        linkedList.add(put);
        return linkedList;
    }

    public int removeJob(JobKey jobKey) throws IOException {
        byte[] bytes = this.jobKeyConv.toBytes(jobKey);
        Table table = this.hbaseConnection.getTable(TableName.valueOf(Constants.HISTORY_TABLE));
        table.delete(new Delete(bytes));
        table.close();
        Scan taskScan = getTaskScan(jobKey);
        taskScan.addColumn(Constants.INFO_FAM_BYTES, JobHistoryKeys.KEYS_TO_BYTES.get(JobHistoryKeys.TASKID));
        taskScan.setCacheBlocks(false);
        ArrayList arrayList = new ArrayList();
        Table table2 = this.hbaseConnection.getTable(TableName.valueOf(Constants.HISTORY_TASK_TABLE));
        ResultScanner scanner = table2.getScanner(taskScan);
        try {
            Iterator it = scanner.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Result result = (Result) it.next();
                if (result != null && !result.isEmpty()) {
                    byte[] row = result.getRow();
                    if (!jobKey.equals(this.taskKeyConv.fromBytes(row))) {
                        LOG.warn("Found task not in the current job " + Bytes.toStringBinary(row));
                        break;
                    }
                    arrayList.add(new Delete(result.getRow()));
                }
            }
            int size = 1 + arrayList.size();
            if (arrayList.size() > 0) {
                LOG.info("Deleting " + arrayList.size() + " tasks for job " + jobKey);
                table2.delete(arrayList);
            }
            return size;
        } finally {
            scanner.close();
            table2.close();
        }
    }
}
