package com.twitter.hraven.datasource;

import com.twitter.hraven.Flow;
import com.twitter.hraven.FlowKey;
import com.twitter.hraven.FlowQueueKey;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:com/twitter/hraven/datasource/TestFlowQueueService.class */
public class TestFlowQueueService {
    private static Log LOG = LogFactory.getLog(TestFlowQueueService.class);
    private static HBaseTestingUtility UTIL;
    private static final String TEST_CLUSTER = "test@test";
    private static final String TEST_USER = "testuser";

    @BeforeClass
    public static void setupBeforeClass() throws Exception {
        UTIL = new HBaseTestingUtility();
        UTIL.startMiniCluster();
        HRavenTestUtil.createFlowQueueTable(UTIL);
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        UTIL.shutdownMiniCluster();
    }

    @Test
    public void testFlowQueueReadWrite() throws Exception {
        FlowQueueService flowQueueService = new FlowQueueService(UTIL.getConfiguration());
        FlowQueueKey flowQueueKey = new FlowQueueKey(TEST_CLUSTER, Flow.Status.RUNNING, System.currentTimeMillis(), "flow1");
        Flow flow = new Flow((FlowKey) null);
        flow.setJobGraphJSON("{}");
        flow.setFlowName("flow1");
        flow.setUserName(TEST_USER);
        flow.setProgress(10);
        flowQueueService.updateFlow(flowQueueKey, flow);
        FlowQueueKey flowQueueKey2 = new FlowQueueKey(TEST_CLUSTER, Flow.Status.RUNNING, System.currentTimeMillis(), "flow2");
        Flow flow2 = new Flow((FlowKey) null);
        flow2.setJobGraphJSON("{}");
        flow2.setFlowName("flow2");
        flow2.setUserName(TEST_USER);
        flow2.setProgress(20);
        flowQueueService.updateFlow(flowQueueKey2, flow2);
        Flow flowFromQueue = flowQueueService.getFlowFromQueue(flowQueueKey.getCluster(), flowQueueKey.getTimestamp(), flowQueueKey.getFlowId());
        Assert.assertNotNull(flowFromQueue);
        assertFlowEquals(flowQueueKey, flow, flowFromQueue);
        List flowsForStatus = flowQueueService.getFlowsForStatus(TEST_CLUSTER, Flow.Status.RUNNING, 10);
        Assert.assertNotNull(flowsForStatus);
        Assert.assertEquals(2L, flowsForStatus.size());
        assertFlowEquals(flowQueueKey, flow, (Flow) flowsForStatus.get(1));
        assertFlowEquals(flowQueueKey2, flow2, (Flow) flowsForStatus.get(0));
        FlowQueueKey flowQueueKey3 = new FlowQueueKey(flowQueueKey.getCluster(), Flow.Status.SUCCEEDED, flowQueueKey.getTimestamp(), flowQueueKey.getFlowId());
        flowQueueService.moveFlow(flowQueueKey, flowQueueKey3);
        FlowQueueKey flowQueueKey4 = new FlowQueueKey(flowQueueKey2.getCluster(), Flow.Status.SUCCEEDED, flowQueueKey2.getTimestamp(), flowQueueKey2.getFlowId());
        flowQueueService.moveFlow(flowQueueKey2, flowQueueKey4);
        List flowsForStatus2 = flowQueueService.getFlowsForStatus(TEST_CLUSTER, Flow.Status.SUCCEEDED, 10);
        Assert.assertNotNull(flowsForStatus2);
        Assert.assertEquals(2L, flowsForStatus2.size());
        assertFlowEquals(flowQueueKey3, flow, (Flow) flowsForStatus2.get(1));
        assertFlowEquals(flowQueueKey4, flow2, (Flow) flowsForStatus2.get(0));
    }

    protected void assertFlowEquals(FlowQueueKey flowQueueKey, Flow flow, Flow flow2) {
        Assert.assertNotNull(flow2.getQueueKey());
        LOG.info("Expected queue key is " + flowQueueKey);
        LOG.info("Result queue key is " + flow2.getQueueKey());
        Assert.assertTrue(flowQueueKey.equals(flow2.getQueueKey()));
        Assert.assertEquals(flow.getJobGraphJSON(), flow2.getJobGraphJSON());
        Assert.assertEquals(flow.getFlowName(), flow2.getFlowName());
        Assert.assertEquals(flow.getUserName(), flow2.getUserName());
        Assert.assertEquals(flow.getProgress(), flow2.getProgress());
    }
}
