/*
 * Decompiled with CFR 0.152.
 */
package org.marketcetera.modules.cep.system;

import java.awt.BorderLayout;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.marketcetera.core.ExpectedTestFailure;
import org.marketcetera.core.notifications.Notification;
import org.marketcetera.event.AskEvent;
import org.marketcetera.event.BidEvent;
import org.marketcetera.event.Event;
import org.marketcetera.event.EventTestBase;
import org.marketcetera.event.LogEvent;
import org.marketcetera.event.MarketDataEvent;
import org.marketcetera.event.MarketstatEvent;
import org.marketcetera.event.TradeEvent;
import org.marketcetera.event.impl.LogEventBuilder;
import org.marketcetera.module.BlockingSinkDataListener;
import org.marketcetera.module.CopierModuleFactory;
import org.marketcetera.module.DataFlowID;
import org.marketcetera.module.DataFlowNotFoundException;
import org.marketcetera.module.DataRequest;
import org.marketcetera.module.IllegalRequestParameterValue;
import org.marketcetera.module.ModuleManager;
import org.marketcetera.module.ModuleNotFoundException;
import org.marketcetera.module.ModuleTestBase;
import org.marketcetera.module.ModuleURN;
import org.marketcetera.module.SinkDataListener;
import org.marketcetera.module.UnsupportedRequestParameterType;
import org.marketcetera.modules.cep.system.Messages;
import org.marketcetera.quickfix.CurrentFIXDataDictionary;
import org.marketcetera.quickfix.FIXDataDictionary;
import org.marketcetera.quickfix.FIXMessageFactory;
import org.marketcetera.quickfix.FIXMessageUtilTest;
import org.marketcetera.quickfix.FIXVersion;
import org.marketcetera.trade.BrokerID;
import org.marketcetera.trade.Equity;
import org.marketcetera.trade.ExecutionReport;
import org.marketcetera.trade.FIXOrder;
import org.marketcetera.trade.Factory;
import org.marketcetera.trade.Instrument;
import org.marketcetera.trade.OrderCancel;
import org.marketcetera.trade.OrderCancelReject;
import org.marketcetera.trade.OrderReplace;
import org.marketcetera.trade.OrderSingle;
import org.marketcetera.trade.Originator;
import org.marketcetera.trade.Suggestion;
import quickfix.Message;
import quickfix.StringField;
import quickfix.field.Symbol;
import quickfix.field.Text;

public abstract class CEPTestBase
extends ModuleTestBase {
    protected FIXDataDictionary fixDD;
    protected ModuleManager sManager;
    protected static BlockingSinkDataListener sSink;
    protected static Factory sFactory;
    protected List<Object> allSentEvents;
    protected BidEvent bid1;
    protected BidEvent bid2;
    protected AskEvent ask1;
    protected AskEvent ask2;
    protected TradeEvent trade1;
    protected TradeEvent trade2;
    protected LogEvent log1;
    protected LogEvent log2;
    protected MarketstatEvent mStat1;
    protected MarketstatEvent mStat2;
    protected Suggestion sug1;
    protected Suggestion sug2;
    protected Notification not1;
    protected Notification not2;
    protected OrderSingle os1;
    protected OrderSingle os2;
    protected OrderCancel oc1;
    protected OrderCancel oc2;
    protected OrderReplace or1;
    protected OrderReplace or2;
    protected OrderCancelReject ocr1;
    protected OrderCancelReject ocr2;
    protected FIXOrder fo1;
    protected FIXOrder fo2;
    protected ExecutionReport er1;
    protected ExecutionReport er2;
    protected Map<Integer, String> map1;
    protected Map<Integer, String> map2;

    protected abstract ModuleURN getModuleURN();

    @Before
    public void before() throws Exception {
        sSink = new BlockingSinkDataListener();
        this.sManager = new ModuleManager();
        this.sManager.init();
        this.sManager.addSinkListener((SinkDataListener)sSink);
        CurrentFIXDataDictionary.setCurrentFIXDataDictionary((FIXDataDictionary)new FIXDataDictionary(FIXVersion.FIX_SYSTEM.getDataDictionaryName()));
        this.ask1 = EventTestBase.generateEquityAskEvent((long)1L, (long)2L, (Equity)new Equity("ABC"), (String)"nyse", (BigDecimal)new BigDecimal("23"), (BigDecimal)new BigDecimal("23"));
        this.ask2 = EventTestBase.generateEquityAskEvent((long)1L, (long)2L, (Equity)new Equity("BIDU"), (String)"nyse", (BigDecimal)new BigDecimal("23"), (BigDecimal)new BigDecimal("23"));
        this.bid1 = EventTestBase.generateEquityBidEvent((long)1L, (long)2L, (Equity)new Equity("CSCO"), (String)"nyse", (BigDecimal)new BigDecimal("23"), (BigDecimal)new BigDecimal("23"));
        this.bid2 = EventTestBase.generateEquityBidEvent((long)1L, (long)2L, (Equity)new Equity("DELL"), (String)"nyse", (BigDecimal)new BigDecimal("23"), (BigDecimal)new BigDecimal("23"));
        this.trade1 = EventTestBase.generateEquityTradeEvent((long)1L, (long)2L, (Equity)new Equity("ECHO"), (String)"nyse", (BigDecimal)new BigDecimal("23"), (BigDecimal)new BigDecimal("23"));
        this.trade2 = EventTestBase.generateEquityTradeEvent((long)1L, (long)2L, (Equity)new Equity("FIGA"), (String)"nyse", (BigDecimal)new BigDecimal("23"), (BigDecimal)new BigDecimal("23"));
        this.log1 = (LogEvent)LogEventBuilder.debug().withMessage(Messages.PROVIDER_DESCRIPTION).create();
        this.log2 = (LogEvent)LogEventBuilder.error().withMessage(Messages.PROVIDER_DESCRIPTION).create();
        this.mStat1 = EventTestBase.generateEquityMarketstatEvent((Equity)new Equity("ABC"), (Date)new Date(), (BigDecimal)BigDecimal.ONE, (BigDecimal)BigDecimal.ONE, (BigDecimal)BigDecimal.ONE, (BigDecimal)BigDecimal.ONE, (BigDecimal)BigDecimal.ONE, (BigDecimal)BigDecimal.ONE, (Date)new Date(), (Date)new Date(), (Date)new Date(), (Date)new Date(), (String)"OYSE", (String)"HYSE", (String)"LYSE", (String)"CYSE");
        this.mStat2 = EventTestBase.generateEquityMarketstatEvent((Equity)new Equity("BIDU"), (Date)new Date(), (BigDecimal)BigDecimal.ONE, (BigDecimal)BigDecimal.ONE, (BigDecimal)BigDecimal.ONE, (BigDecimal)BigDecimal.ONE, (BigDecimal)BigDecimal.ONE, (BigDecimal)BigDecimal.ONE, (Date)new Date(), (Date)new Date(), (Date)new Date(), (Date)new Date(), (String)"OYSE", (String)"HYSE", (String)"LYSE", (String)"CYSE");
        this.sug1 = Factory.getInstance().createOrderSingleSuggestion();
        this.sug1.setIdentifier("acura");
        this.sug2 = Factory.getInstance().createOrderSingleSuggestion();
        this.sug2.setIdentifier("integra");
        this.not1 = Notification.debug((String)"kathmandu", (String)"kathmandu", (String)((Object)((Object)this)).toString());
        this.not2 = Notification.debug((String)"pokhara", (String)"pokhara", (String)((Object)((Object)this)).toString());
        this.os1 = sFactory.createOrderSingle();
        this.os1.setBrokerID(new BrokerID("os1"));
        this.os2 = sFactory.createOrderSingle();
        this.os2.setBrokerID(new BrokerID("os2"));
        Message nos = FIXMessageUtilTest.createNOS((String)"LADA", (BigDecimal)BigDecimal.ZERO, (BigDecimal)BigDecimal.ZERO, (char)'a', (FIXMessageFactory)FIXVersion.FIX_SYSTEM.getMessageFactory());
        Message can1 = FIXVersion.FIX_SYSTEM.getMessageFactory().newCancelFromMessage(nos);
        this.oc1 = sFactory.createOrderCancel(can1, new BrokerID("dest1"));
        Message can2 = FIXVersion.FIX_SYSTEM.getMessageFactory().newCancelFromMessage(nos);
        can2.setField((StringField)new Symbol("ZAPO"));
        this.oc2 = sFactory.createOrderCancel(can2, new BrokerID("dest2"));
        Message cxr1 = FIXVersion.FIX_SYSTEM.getMessageFactory().newCancelReplaceFromMessage(nos);
        this.or1 = sFactory.createOrderReplace(cxr1, new BrokerID("lada"));
        Message cxr2 = FIXVersion.FIX_SYSTEM.getMessageFactory().newCancelReplaceFromMessage(nos);
        cxr2.setField((StringField)new Symbol("ZAPO"));
        this.or2 = sFactory.createOrderReplace(cxr2, new BrokerID("zapo"));
        nos = FIXMessageUtilTest.createNOS((String)"fixORDER", (BigDecimal)BigDecimal.ZERO, (BigDecimal)BigDecimal.ZERO, (char)'a', (FIXMessageFactory)FIXVersion.FIX_SYSTEM.getMessageFactory());
        this.fo1 = Factory.getInstance().createOrder(nos, new BrokerID("chuck"));
        this.fo2 = Factory.getInstance().createOrder(nos, new BrokerID("morgan"));
        Message rej1 = FIXVersion.FIX42.getMessageFactory().newOrderCancelReject();
        rej1.setField((StringField)new Text("GOOG"));
        this.ocr1 = sFactory.createOrderCancelReject(rej1, new BrokerID("dest"), Originator.Server, null, null);
        Message rej2 = FIXVersion.FIX42.getMessageFactory().newOrderCancelReject();
        rej2.setField((StringField)new Text("CSCO"));
        this.ocr2 = sFactory.createOrderCancelReject(rej2, new BrokerID("dest"), Originator.Server, null, null);
        Message er1_fix = FIXVersion.FIX42.getMessageFactory().newExecutionReport("orderid", "clOrdID", "execID", 'a', 'b', BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, (Instrument)new Equity("IFLI"), "acct", "text");
        this.er1 = sFactory.createExecutionReport(er1_fix, new BrokerID("dest1"), Originator.Server, null, null);
        Message er2_fix = FIXVersion.FIX42.getMessageFactory().newExecutionReport("orderid", "clOrdID", "execID", 'a', 'b', BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, (Instrument)new Equity("GOOG"), "acct", "text");
        this.er2 = sFactory.createExecutionReport(er2_fix, new BrokerID("dest2"), Originator.Server, null, null);
        this.map1 = new HashMap<Integer, String>();
        this.map1.put(0, "bob");
        this.map1.put(1, "bubba");
        this.map2 = new HashMap<Integer, String>();
        this.map2.put(3, "fred");
        this.map2.put(4, "fedya");
        this.allSentEvents = Arrays.asList(this.ask1, this.ask2, this.bid1, this.bid2, this.trade1, this.trade2, this.sug1, this.sug2, this.not1, this.not2, this.os1, 37, this.os2, this.oc1, this.oc2, this.or1, this.or2, this.ocr1, this.ocr2, this.fo1, this.fo2, this.er1, 42, this.er2, "pupkin", this.map1, this.map2, this.log1, this.log2, this.mStat1, this.mStat2);
    }

    @After
    public void after() throws Exception {
        this.sManager.removeSinkListener((SinkDataListener)sSink);
        this.sManager.stop();
    }

    @Test
    public void testInvalidDataRequestArgument() throws Exception {
        new ExpectedTestFailure(IllegalRequestParameterValue.class, org.marketcetera.module.Messages.ILLEGAL_REQ_PARM_VALUE.getText((Object)this.getModuleURN(), null)){

            protected void execute() throws Throwable {
                CEPTestBase.this.sManager.createDataFlow(new DataRequest[]{new DataRequest(CEPTestBase.this.getModuleURN(), null)});
            }
        }.run();
    }

    @Test(timeout=120000L)
    public void testNonStringRequestParameter() throws Exception {
        new ExpectedTestFailure(UnsupportedRequestParameterType.class, org.marketcetera.module.Messages.UNSUPPORTED_REQ_PARM_TYPE.getText((Object)this.getModuleURN(), (Object)Integer.class.getName())){

            protected void execute() throws Throwable {
                CEPTestBase.this.sManager.createDataFlow(new DataRequest[]{new DataRequest(CopierModuleFactory.INSTANCE_URN, (Object)new Event[]{EventTestBase.generateEquityTradeEvent((long)3L, (long)4L, (Equity)new Equity("IBM"), (String)"NYSE", (BigDecimal)new BigDecimal("85"), (BigDecimal)new BigDecimal("200"))}), new DataRequest(CEPTestBase.this.getModuleURN(), (Object)37)});
            }
        }.run();
    }

    protected abstract Class<?> getIncorrectQueryException();

    @Test(timeout=120000L)
    public void testIncorrectQuerySyntax() throws Exception {
        String query = "man, is this syntax incorrect or what??";
        new ExpectedTestFailure(this.getIncorrectQueryException()){

            protected void execute() throws Throwable {
                CEPTestBase.this.sManager.createDataFlow(new DataRequest[]{new DataRequest(CopierModuleFactory.INSTANCE_URN, (Object)new Event[]{EventTestBase.generateEquityTradeEvent((long)3L, (long)4L, (Equity)new Equity("IBM"), (String)"NYSE", (BigDecimal)new BigDecimal("85"), (BigDecimal)new BigDecimal("200"))}), new DataRequest(CEPTestBase.this.getModuleURN(), (Object)"man, is this syntax incorrect or what??")});
            }
        }.run();
    }

    public abstract void testUnknownAlias() throws Exception;

    @Test(timeout=120000L)
    public void testValidJavaClass() throws Exception {
        DataFlowID flow1 = this.sManager.createDataFlow(new DataRequest[]{new DataRequest(CopierModuleFactory.INSTANCE_URN, (Object)new Event[]{EventTestBase.generateEquityTradeEvent((long)3L, (long)4L, (Equity)new Equity("IBM"), (String)"NYSE", (BigDecimal)new BigDecimal("85"), (BigDecimal)new BigDecimal("200")), EventTestBase.generateEquityBidEvent((long)1L, (long)2L, (Equity)new Equity("IBM"), (String)"NYSE", (BigDecimal)new BigDecimal("85"), (BigDecimal)new BigDecimal("100")), EventTestBase.generateEquityAskEvent((long)5L, (long)6L, (Equity)new Equity("JAVA"), (String)"NASDAQ", (BigDecimal)new BigDecimal("1.23"), (BigDecimal)new BigDecimal("300"))}), new DataRequest(this.getModuleURN(), (Object)("select * from " + BorderLayout.class.getName()))});
        this.sManager.cancel(flow1);
    }

    @Test(timeout=120000L)
    public void testUnmappedJavaObject() throws Exception {
        DataFlowID flow1 = this.sManager.createDataFlow(new DataRequest[]{new DataRequest(CopierModuleFactory.INSTANCE_URN, (Object)new Object[]{EventTestBase.generateEquityTradeEvent((long)3L, (long)4L, (Equity)new Equity("IBM"), (String)"NYSE", (BigDecimal)new BigDecimal("85"), (BigDecimal)new BigDecimal("200")), EventTestBase.generateEquityBidEvent((long)1L, (long)2L, (Equity)new Equity("IBM"), (String)"NYSE", (BigDecimal)new BigDecimal("85"), (BigDecimal)new BigDecimal("100")), 37, EventTestBase.generateEquityAskEvent((long)5L, (long)6L, (Equity)new Equity("JAVA"), (String)"NASDAQ", (BigDecimal)new BigDecimal("1.23"), (BigDecimal)new BigDecimal("300"))}), new DataRequest(this.getModuleURN(), (Object)("select * from " + Integer.class.getName()))});
        Assert.assertEquals((Object)37, (Object)sSink.getNextData());
        this.sManager.cancel(flow1);
    }

    @Test(timeout=120000L)
    public void testCancel() throws Exception {
        final DataFlowID flow1 = this.sManager.createDataFlow(new DataRequest[]{new DataRequest(CopierModuleFactory.INSTANCE_URN, (Object)new Event[]{EventTestBase.generateEquityTradeEvent((long)3L, (long)4L, (Equity)new Equity("IBM"), (String)"NYSE", (BigDecimal)new BigDecimal("85"), (BigDecimal)new BigDecimal("200")), EventTestBase.generateEquityBidEvent((long)1L, (long)2L, (Equity)new Equity("IBM"), (String)"NYSE", (BigDecimal)new BigDecimal("85"), (BigDecimal)new BigDecimal("100")), EventTestBase.generateEquityAskEvent((long)5L, (long)6L, (Equity)new Equity("JAVA"), (String)"NASDAQ", (BigDecimal)new BigDecimal("1.23"), (BigDecimal)new BigDecimal("300"))}), new DataRequest(this.getModuleURN(), (Object)("select * from " + BidEvent.class.getName()))});
        BidEvent theBid = (BidEvent)sSink.getNextData();
        Assert.assertEquals((String)"didnt' get bid event", (Object)"IBM", (Object)theBid.getInstrumentAsString());
        Assert.assertEquals((String)"didnt' get right size", (Object)new BigDecimal("85"), (Object)theBid.getPrice());
        Assert.assertEquals((String)"CEP sent out extra events", (long)1L, (long)this.sManager.getDataFlowInfo(flow1).getFlowSteps()[1].getNumEmitted());
        this.sManager.cancel(flow1);
        new ExpectedTestFailure(DataFlowNotFoundException.class, flow1.toString()){

            protected void execute() throws Throwable {
                CEPTestBase.this.sManager.getDataFlowInfo(flow1);
            }
        }.run();
        final DataFlowID flow2 = this.sManager.createDataFlow(new DataRequest[]{new DataRequest(CopierModuleFactory.INSTANCE_URN, (Object)new Event[]{EventTestBase.generateEquityBidEvent((long)1L, (long)2L, (Equity)new Equity("GOOG"), (String)"NYSE", (BigDecimal)new BigDecimal("300"), (BigDecimal)new BigDecimal("100")), EventTestBase.generateEquityTradeEvent((long)3L, (long)4L, (Equity)new Equity("IBM"), (String)"NYSE", (BigDecimal)new BigDecimal("85"), (BigDecimal)new BigDecimal("200")), EventTestBase.generateEquityAskEvent((long)5L, (long)6L, (Equity)new Equity("JAVA"), (String)"NASDAQ", (BigDecimal)new BigDecimal("1.23"), (BigDecimal)new BigDecimal("300"))}), new DataRequest(this.getModuleURN(), (Object)("select * from " + BidEvent.class.getName()))});
        theBid = (BidEvent)sSink.getNextData();
        Assert.assertEquals((String)"didnt' get bid event", (Object)"GOOG", (Object)theBid.getInstrumentAsString());
        Assert.assertEquals((String)"didnt' get right size", (Object)new BigDecimal("300"), (Object)theBid.getPrice());
        Assert.assertEquals((String)"CEP sent out extra events", (long)1L, (long)this.sManager.getDataFlowInfo(flow2).getFlowSteps()[1].getNumEmitted());
        this.sManager.cancel(flow2);
        new ExpectedTestFailure(DataFlowNotFoundException.class, flow2.toString()){

            protected void execute() throws Throwable {
                CEPTestBase.this.sManager.getDataFlowInfo(flow2);
            }
        }.run();
        TradeEvent tradeEvent = EventTestBase.generateEquityTradeEvent((long)3L, (long)4L, (Equity)new Equity("IBM"), (String)"NYSE", (BigDecimal)new BigDecimal("85"), (BigDecimal)new BigDecimal("200"));
        final DataFlowID flow3 = this.sManager.createDataFlow(new DataRequest[]{new DataRequest(CopierModuleFactory.INSTANCE_URN, (Object)new Event[]{EventTestBase.generateEquityBidEvent((long)1L, (long)2L, (Equity)new Equity("ZOOG"), (String)"NYSE", (BigDecimal)new BigDecimal("300"), (BigDecimal)new BigDecimal("100")), EventTestBase.generateEquityAskEvent((long)5L, (long)6L, (Equity)new Equity("JAVA"), (String)"NASDAQ", (BigDecimal)new BigDecimal("1.23"), (BigDecimal)new BigDecimal("300")), tradeEvent}), new DataRequest(this.getModuleURN(), (Object)("select * from " + TradeEvent.class.getName()))});
        TradeEvent theTrade = (TradeEvent)sSink.getNextData();
        Assert.assertSame((String)"wrong event received", (Object)tradeEvent, (Object)theTrade);
        Assert.assertEquals((String)"didnt' get bid event", (Object)"IBM", (Object)theTrade.getInstrumentAsString());
        Assert.assertEquals((String)"didnt' get right size", (Object)new BigDecimal("85"), (Object)theTrade.getPrice());
        Assert.assertEquals((String)"CEP didn't receive all events", (long)3L, (long)this.sManager.getDataFlowInfo(flow3).getFlowSteps()[1].getNumReceived());
        Assert.assertEquals((String)"CEP sent out extra events", (long)1L, (long)this.sManager.getDataFlowInfo(flow3).getFlowSteps()[1].getNumEmitted());
        this.sManager.cancel(flow3);
        new ExpectedTestFailure(DataFlowNotFoundException.class, flow3.toString()){

            protected void execute() throws Throwable {
                CEPTestBase.this.sManager.getDataFlowInfo(flow3);
            }
        }.run();
    }

    @Test(timeout=120000L)
    public void testAsk() throws Exception {
        this.flowTestHelperWrapper("ask", AskEvent.class.getName(), this.ask1, this.ask2);
    }

    public void testBid() throws Exception {
        this.flowTestHelperWrapper("bid", BidEvent.class.getName(), this.bid1, this.bid2);
    }

    @Test(timeout=120000L)
    public void testTrade() throws Exception {
        this.flowTestHelperWrapper("trade", TradeEvent.class.getName(), this.trade1, this.trade2);
    }

    @Test(timeout=120000L)
    public void testExecutionReport() throws Exception {
        this.flowTestHelperWrapper("report", ExecutionReport.class.getName(), this.er1, this.er2);
    }

    @Test(timeout=120000L)
    public void testOrderCancelReject() throws Exception {
        this.flowTestHelperWrapper("cancelReject", OrderCancelReject.class.getName(), this.ocr1, this.ocr2);
    }

    @Test(timeout=120000L)
    public void testOrderSingle() throws Exception {
        this.flowTestHelperWrapper("orderSingle", OrderSingle.class.getName(), this.os1, this.os2);
    }

    @Test(timeout=120000L)
    public void testOrderCancel() throws Exception {
        this.flowTestHelperWrapper("orderCancel", OrderCancel.class.getName(), this.oc1, this.oc2);
    }

    @Test(timeout=120000L)
    public void testOrderReplace() throws Exception {
        this.flowTestHelperWrapper("orderReplace", OrderReplace.class.getName(), this.or1, this.or2);
    }

    @Test(timeout=120000L)
    public void testFIXOrder() throws Exception {
        this.flowTestHelperWrapper("fixOrder", FIXOrder.class.getName(), this.fo1, this.fo2);
    }

    @Test(timeout=120000L)
    public void testNotification() throws Exception {
        this.flowTestHelperWrapper("notif", Notification.class.getName(), this.not1, this.not2);
    }

    @Test(timeout=120000L)
    public void testSuggestion() throws Exception {
        this.flowTestHelperWrapper("suggest", Suggestion.class.getName(), this.sug1, this.sug2);
    }

    @Test(timeout=120000L)
    public void testMap() throws Exception {
        this.flowTestHelperWrapper("map", Map.class.getName(), this.map1, this.map2);
    }

    @Test(timeout=120000L)
    public void testMarketData() throws Exception {
        this.flowTestHelperWrapper("mdata", MarketDataEvent.class.getName(), this.ask1, this.ask2, this.bid1, this.bid2, this.trade1, this.trade2);
    }

    @Test(timeout=120000L)
    public void testLog() throws Exception {
        this.flowTestHelperWrapper("log", LogEvent.class.getName(), this.log1, this.log2);
    }

    @Test(timeout=120000L)
    public void testMarketstatEvent() throws Exception {
        this.flowTestHelperWrapper("marketstat", MarketstatEvent.class.getName(), this.mStat1, this.mStat2);
    }

    protected void flowTestHelperWrapper(String expectedAlias, String expectedClass, Object ... expectedEvents) throws Exception {
        this.flowTestHelper(expectedAlias, expectedEvents);
        this.flowTestHelper(expectedClass, expectedEvents);
    }

    protected void flowTestHelper(String type, Object[] expectedEvents) throws Exception {
        new ExpectedTestFailure(ModuleNotFoundException.class){

            protected void execute() throws Throwable {
                CEPTestBase.this.sManager.getModuleInfo(CEPTestBase.this.getModuleURN());
            }
        }.run();
        DataFlowID flowID = this.sManager.createDataFlow(new DataRequest[]{new DataRequest(CopierModuleFactory.INSTANCE_URN, this.allSentEvents), new DataRequest(this.getModuleURN(), (Object)("select * from " + type))});
        for (int i = 0; i < expectedEvents.length; ++i) {
            Object event = sSink.getNextData();
            if (expectedEvents[i] instanceof Map) {
                Object[] eventKeys = ((Map)event).keySet().toArray();
                Arrays.sort(eventKeys);
                Object[] expectedKeys = ((Map)expectedEvents[i]).keySet().toArray();
                Arrays.sort(expectedKeys);
                Assert.assertArrayEquals((String)"keys not equal", (Object[])eventKeys, (Object[])expectedKeys);
                for (Object expectedKey : expectedKeys) {
                    Assert.assertEquals((String)("value for key not the same" + expectedKey), ((Map)expectedEvents[i]).get(expectedKey), ((Map)event).get(expectedKey));
                }
                continue;
            }
            Assert.assertSame((String)("Wrong event received in[" + i + "] " + event), (Object)expectedEvents[i], (Object)event);
        }
        Assert.assertEquals((String)"CEP didn't send out right # of events", (long)expectedEvents.length, (long)this.sManager.getDataFlowInfo(flowID).getFlowSteps()[1].getNumEmitted());
        this.sManager.cancel(flowID);
        new ExpectedTestFailure(ModuleNotFoundException.class){

            protected void execute() throws Throwable {
                CEPTestBase.this.sManager.getModuleInfo(CEPTestBase.this.getModuleURN());
            }
        }.run();
    }
}

