package io.druid.segment.data;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.guava.Sequences;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:io/druid/segment/data/IncrementalIndexTest.class */
public class IncrementalIndexTest {
    private final IndexCreator indexCreator;
    private static final AggregatorFactory[] defaultAggregatorFactories = {new CountAggregatorFactory("count")};

    /* loaded from: input_file:io/druid/segment/data/IncrementalIndexTest$IndexCreator.class */
    interface IndexCreator {
        IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactoryArr);
    }

    public IncrementalIndexTest(IndexCreator indexCreator) {
        this.indexCreator = indexCreator;
    }

    @Parameterized.Parameters
    public static Collection<?> constructorFeeder() throws IOException {
        return Arrays.asList(new Object[]{new IndexCreator() { // from class: io.druid.segment.data.IncrementalIndexTest.1
            @Override // io.druid.segment.data.IncrementalIndexTest.IndexCreator
            public IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactoryArr) {
                return IncrementalIndexTest.createIndex(true, aggregatorFactoryArr);
            }
        }}, new Object[]{new IndexCreator() { // from class: io.druid.segment.data.IncrementalIndexTest.2
            @Override // io.druid.segment.data.IncrementalIndexTest.IndexCreator
            public IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactoryArr) {
                return IncrementalIndexTest.createIndex(false, aggregatorFactoryArr);
            }
        }});
    }

    public static IncrementalIndex createIndex(boolean z, AggregatorFactory[] aggregatorFactoryArr) {
        if (null == aggregatorFactoryArr) {
            aggregatorFactoryArr = defaultAggregatorFactories;
        }
        return z ? new OffheapIncrementalIndex(0L, QueryGranularity.NONE, aggregatorFactoryArr, TestQueryRunners.pool, true, 104857600) : new OnheapIncrementalIndex(0L, QueryGranularity.NONE, aggregatorFactoryArr, 1000000);
    }

    public static void populateIndex(long j, IncrementalIndex incrementalIndex) throws IndexSizeExceededException {
        incrementalIndex.add(new MapBasedInputRow(j, Arrays.asList("dim1", "dim2"), ImmutableMap.of("dim1", "1", "dim2", "2")));
        incrementalIndex.add(new MapBasedInputRow(j, Arrays.asList("dim1", "dim2"), ImmutableMap.of("dim1", "3", "dim2", "4")));
    }

    public static MapBasedInputRow getRow(long j, int i, int i2) {
        ArrayList arrayList = new ArrayList(i2);
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (int i3 = 0; i3 < i2; i3++) {
            String format = String.format("Dim_%d", Integer.valueOf(i3));
            arrayList.add(format);
            builder.put(format, format + i);
        }
        return new MapBasedInputRow(j, arrayList, builder.build());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static MapBasedInputRow getLongRow(long j, int i, int i2) {
        ArrayList arrayList = new ArrayList(i2);
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (int i3 = 0; i3 < i2; i3++) {
            String format = String.format("Dim_%d", Integer.valueOf(i3));
            arrayList.add(format);
            builder.put(format, 1L);
        }
        return new MapBasedInputRow(j, arrayList, builder.build());
    }

    @Test
    public void testCaseSensitivity() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        IncrementalIndex createIndex = this.indexCreator.createIndex(defaultAggregatorFactories);
        populateIndex(currentTimeMillis, createIndex);
        Assert.assertEquals(Arrays.asList("dim1", "dim2"), createIndex.getDimensions());
        Assert.assertEquals(2L, createIndex.size());
        Iterator it = createIndex.iterator();
        Row row = (Row) it.next();
        Assert.assertEquals(currentTimeMillis, row.getTimestampFromEpoch());
        Assert.assertEquals(Arrays.asList("1"), row.getDimension("dim1"));
        Assert.assertEquals(Arrays.asList("2"), row.getDimension("dim2"));
        Row row2 = (Row) it.next();
        Assert.assertEquals(currentTimeMillis, row2.getTimestampFromEpoch());
        Assert.assertEquals(Arrays.asList("3"), row2.getDimension("dim1"));
        Assert.assertEquals(Arrays.asList("4"), row2.getDimension("dim2"));
    }

    @Test(timeout = 60000)
    public void testConcurrentAddRead() throws InterruptedException, ExecutionException {
        ArrayList arrayList = new ArrayList(6);
        arrayList.add(new CountAggregatorFactory("rows"));
        for (int i = 0; i < 5; i++) {
            arrayList.add(new LongSumAggregatorFactory(String.format("sumResult%s", Integer.valueOf(i)), String.format("Dim_%s", Integer.valueOf(i))));
            arrayList.add(new DoubleSumAggregatorFactory(String.format("doubleSumResult%s", Integer.valueOf(i)), String.format("Dim_%s", Integer.valueOf(i))));
        }
        final ArrayList arrayList2 = new ArrayList(6);
        arrayList2.add(new CountAggregatorFactory("rows"));
        for (int i2 = 0; i2 < 5; i2++) {
            arrayList2.add(new LongSumAggregatorFactory(String.format("sumResult%s", Integer.valueOf(i2)), String.format("sumResult%s", Integer.valueOf(i2))));
            arrayList2.add(new DoubleSumAggregatorFactory(String.format("doubleSumResult%s", Integer.valueOf(i2)), String.format("doubleSumResult%s", Integer.valueOf(i2))));
        }
        final IncrementalIndex createIndex = this.indexCreator.createIndex((AggregatorFactory[]) arrayList.toArray(new AggregatorFactory[5]));
        ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(3, new ThreadFactoryBuilder().setDaemon(false).setNameFormat("index-executor-%d").setPriority(1).build()));
        ListeningExecutorService listeningDecorator2 = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(3, new ThreadFactoryBuilder().setDaemon(false).setNameFormat("query-executor-%d").build()));
        final long currentTimeMillis = System.currentTimeMillis();
        final Interval interval = new Interval("1900-01-01T00:00:00Z/2900-01-01T00:00:00Z");
        LinkedList linkedList = new LinkedList();
        LinkedList linkedList2 = new LinkedList();
        final IncrementalIndexSegment incrementalIndexSegment = new IncrementalIndexSegment(createIndex, (String) null);
        final TimeseriesQueryRunnerFactory timeseriesQueryRunnerFactory = new TimeseriesQueryRunnerFactory(new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        for (int i3 = 0; i3 < 30; i3++) {
            linkedList.add(listeningDecorator.submit(new Runnable() { // from class: io.druid.segment.data.IncrementalIndexTest.3
                @Override // java.lang.Runnable
                public void run() {
                    atomicInteger.incrementAndGet();
                    for (int i4 = 0; i4 < 100; i4++) {
                        try {
                            atomicInteger2.incrementAndGet();
                            createIndex.add(IncrementalIndexTest.getLongRow(currentTimeMillis + i4, i4, 5));
                        } catch (IndexSizeExceededException e) {
                            throw Throwables.propagate(e);
                        }
                    }
                    atomicInteger.decrementAndGet();
                }
            }));
            linkedList2.add(listeningDecorator2.submit(new Runnable() { // from class: io.druid.segment.data.IncrementalIndexTest.4
                @Override // java.lang.Runnable
                public void run() {
                    Iterator it = ((LinkedList) Sequences.toList(new FinalizeResultsQueryRunner(timeseriesQueryRunnerFactory.createRunner(incrementalIndexSegment), timeseriesQueryRunnerFactory.getToolchest()).run(Druids.newTimeseriesQueryBuilder().dataSource("xxx").granularity(QueryGranularity.ALL).intervals(ImmutableList.of(interval)).aggregators(arrayList2).build(), new HashMap()), new LinkedList())).iterator();
                    while (it.hasNext()) {
                        Result result = (Result) it.next();
                        Integer valueOf = Integer.valueOf(atomicInteger2.get());
                        if (valueOf.intValue() > 0) {
                            Double doubleMetric = ((TimeseriesResultValue) result.getValue()).getDoubleMetric("doubleSumResult0");
                            Assert.assertTrue(String.format("%d >= %g >= 0 violated", valueOf, doubleMetric), doubleMetric.doubleValue() >= 0.0d && doubleMetric.doubleValue() <= ((double) valueOf.intValue()));
                        }
                    }
                    if (atomicInteger.get() > 0) {
                        atomicBoolean.set(true);
                    }
                }
            }));
        }
        ArrayList arrayList3 = new ArrayList(linkedList2.size() + linkedList.size());
        arrayList3.addAll(linkedList2);
        arrayList3.addAll(linkedList);
        Futures.allAsList(arrayList3).get();
        Assert.assertTrue("Did not hit concurrency, please try again", atomicBoolean.get());
        listeningDecorator2.shutdown();
        listeningDecorator.shutdown();
        for (Result result : Sequences.toList(new FinalizeResultsQueryRunner(timeseriesQueryRunnerFactory.createRunner(incrementalIndexSegment), timeseriesQueryRunnerFactory.getToolchest()).run(Druids.newTimeseriesQueryBuilder().dataSource("xxx").granularity(QueryGranularity.ALL).intervals(ImmutableList.of(interval)).aggregators(arrayList2).build(), new HashMap()), new LinkedList())) {
            Assert.assertEquals(100L, ((TimeseriesResultValue) result.getValue()).getLongMetric("rows").intValue());
            for (int i4 = 0; i4 < 5; i4++) {
                Assert.assertEquals(String.format("Failed long sum on dimension %d", Integer.valueOf(i4)), 3000L, ((TimeseriesResultValue) result.getValue()).getLongMetric(String.format("sumResult%s", Integer.valueOf(i4))).intValue());
                Assert.assertEquals(String.format("Failed double sum on dimension %d", Integer.valueOf(i4)), 3000L, ((TimeseriesResultValue) result.getValue()).getDoubleMetric(String.format("doubleSumResult%s", Integer.valueOf(i4))).intValue());
            }
        }
    }

    @Test
    public void testConcurrentAdd() throws Exception {
        final IncrementalIndex createIndex = this.indexCreator.createIndex(defaultAggregatorFactories);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        final long currentTimeMillis = System.currentTimeMillis();
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            newFixedThreadPool.submit(new Runnable() { // from class: io.druid.segment.data.IncrementalIndexTest.5
                @Override // java.lang.Runnable
                public void run() {
                    for (int i2 = 0; i2 < 200; i2++) {
                        try {
                            createIndex.add(IncrementalIndexTest.getRow(currentTimeMillis + i2, i2, 5));
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    countDownLatch.countDown();
                }
            });
        }
        Assert.assertTrue(countDownLatch.await(60L, TimeUnit.SECONDS));
        Assert.assertEquals(5L, createIndex.getDimensions().size());
        Assert.assertEquals(200L, createIndex.size());
        Iterator it = createIndex.iterator();
        int i2 = 0;
        while (it.hasNext()) {
            Row row = (Row) it.next();
            Assert.assertEquals(currentTimeMillis + i2, row.getTimestampFromEpoch());
            Assert.assertEquals(Float.valueOf(10.0f), Float.valueOf(row.getFloatMetric("count")));
            i2++;
        }
        Assert.assertEquals(200L, i2);
    }

    @Test
    public void testOffheapIndexIsFull() throws IndexSizeExceededException {
        OffheapIncrementalIndex offheapIncrementalIndex = new OffheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{new CountAggregatorFactory("count")}, TestQueryRunners.pool, true, 12582912);
        int i = 0;
        for (int i2 = 0; i2 < 500; i2++) {
            i = offheapIncrementalIndex.add(getRow(System.currentTimeMillis(), i2, 100));
            if (!offheapIncrementalIndex.canAppendRow()) {
                break;
            }
        }
        Assert.assertTrue("rowCount : " + i, i > 200 && i < 600);
    }
}
