package org.pentaho.di.trans.streaming.common;

import io.reactivex.Flowable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.pentaho.di.core.Result;
import org.pentaho.di.core.RowMetaAndData;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.row.RowMeta;
import org.pentaho.di.core.row.value.ValueMetaString;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.SubtransExecutor;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/pentaho/di/trans/streaming/common/FixedTimeStreamWindowTest.class */
public class FixedTimeStreamWindowTest {

    @Mock
    private SubtransExecutor subtransExecutor;

    /* loaded from: input_file:org/pentaho/di/trans/streaming/common/FixedTimeStreamWindowTest$BufferThread.class */
    private class BufferThread implements Runnable {
        private FixedTimeStreamWindow window;
        private Flowable flowable;
        private Result mockResult;

        public BufferThread(FixedTimeStreamWindow fixedTimeStreamWindow, Flowable flowable, Result result) {
            this.window = fixedTimeStreamWindow;
            this.flowable = flowable;
            this.mockResult = result;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.window.buffer(this.flowable).forEach(obj -> {
                Assert.assertEquals(this.mockResult, obj);
            });
        }
    }

    @Test
    public void emptyResultShouldNotThrowException() throws KettleException {
        Mockito.when(this.subtransExecutor.execute((List) Matchers.any())).thenReturn(Optional.empty());
        RowMeta rowMeta = new RowMeta();
        rowMeta.addValueMeta(new ValueMetaString("field"));
        new FixedTimeStreamWindow(this.subtransExecutor, rowMeta, 0L, 2, 1).buffer(Flowable.fromIterable(Collections.singletonList(Arrays.asList("v1", "v2")))).forEach(result -> {
        });
    }

    @Test
    public void resultsComeBackToParent() throws KettleException {
        RowMeta rowMeta = new RowMeta();
        rowMeta.addValueMeta(new ValueMetaString("field"));
        Result result = new Result();
        result.setRows(Arrays.asList(new RowMetaAndData(rowMeta, new Object[]{"queen"}), new RowMetaAndData(rowMeta, new Object[]{"king"})));
        Mockito.when(this.subtransExecutor.execute((List) Matchers.any())).thenReturn(Optional.of(result));
        new FixedTimeStreamWindow(this.subtransExecutor, rowMeta, 0L, 2, 1).buffer(Flowable.fromIterable(Collections.singletonList(Arrays.asList("v1", "v2")))).forEach(result2 -> {
            Assert.assertEquals(result, result2);
        });
    }

    @Test
    public void abortedSubtransThrowsAnError() throws KettleException {
        Result result = new Result();
        result.setNrErrors(1L);
        Mockito.when(this.subtransExecutor.execute((List) Matchers.any())).thenReturn(Optional.of(result));
        RowMeta rowMeta = new RowMeta();
        rowMeta.addValueMeta(new ValueMetaString("field"));
        try {
            new FixedTimeStreamWindow(this.subtransExecutor, rowMeta, 0L, 2, 1).buffer(Flowable.fromIterable(Collections.singletonList(Arrays.asList("v1", "v2")))).forEach(result2 -> {
            });
        } catch (Exception e) {
            Assert.assertEquals(BaseMessages.getString(BaseStreamStep.class, "FixedTimeStreamWindow.SubtransFailed", new String[0]), e.getCause().getMessage().trim());
        }
    }

    @Test
    public void testSharedStreamingBatchPoolInternalState() throws Exception {
        System.setProperty("SHARED_STREAMING_BATCH_POOL_SIZE", "5");
        FixedTimeStreamWindow fixedTimeStreamWindow = new FixedTimeStreamWindow(this.subtransExecutor, new RowMeta(), 0L, 2, 1);
        Field declaredField = fixedTimeStreamWindow.getClass().getDeclaredField("sharedStreamingBatchPool");
        declaredField.setAccessible(true);
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) declaredField.get(fixedTimeStreamWindow);
        Assert.assertTrue(threadPoolExecutor.getCorePoolSize() == 5);
        System.setProperty("SHARED_STREAMING_BATCH_POOL_SIZE", "10");
        FixedTimeStreamWindow fixedTimeStreamWindow2 = new FixedTimeStreamWindow(this.subtransExecutor, new RowMeta(), 0L, 2, 1);
        Field declaredField2 = fixedTimeStreamWindow2.getClass().getDeclaredField("sharedStreamingBatchPool");
        declaredField2.setAccessible(true);
        ThreadPoolExecutor threadPoolExecutor2 = (ThreadPoolExecutor) declaredField2.get(fixedTimeStreamWindow2);
        Assert.assertTrue(threadPoolExecutor2.getCorePoolSize() == 10);
        Assert.assertEquals(threadPoolExecutor, threadPoolExecutor2);
    }

    @Test
    public void testSharedStreamingBatchPoolExecution() throws Exception {
        ArrayList arrayList = new ArrayList();
        System.setProperty("SHARED_STREAMING_BATCH_POOL_SIZE", "1");
        RowMeta rowMeta = new RowMeta();
        rowMeta.addValueMeta(new ValueMetaString("field"));
        Result result = new Result();
        result.setRows(Arrays.asList(new RowMetaAndData(rowMeta, new Object[]{"queen"}), new RowMetaAndData(rowMeta, new Object[]{"king"})));
        FixedTimeStreamWindow fixedTimeStreamWindow = new FixedTimeStreamWindow(this.subtransExecutor, rowMeta, 0L, 10, 10);
        FixedTimeStreamWindow fixedTimeStreamWindow2 = new FixedTimeStreamWindow(this.subtransExecutor, rowMeta, 0L, 10, 10);
        Flowable fromIterable = Flowable.fromIterable(Collections.singletonList(Arrays.asList("v1", "v2")));
        Field declaredField = fixedTimeStreamWindow.getClass().getDeclaredField("sharedStreamingBatchPool");
        declaredField.setAccessible(true);
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) declaredField.get(fixedTimeStreamWindow);
        Mockito.when(this.subtransExecutor.execute((List) Matchers.any())).thenAnswer(invocationOnMock -> {
            if (threadPoolExecutor.getActiveCount() != 1) {
                arrayList.add("Error: Active count should have been 1 at all times. Current value: " + threadPoolExecutor.getActiveCount());
            }
            return Optional.of(result);
        });
        new Thread(new BufferThread(fixedTimeStreamWindow, fromIterable, result)).start();
        new Thread(new BufferThread(fixedTimeStreamWindow2, fromIterable, result)).start();
        Thread.sleep(10000L);
        Assert.assertEquals(0L, arrayList.size());
    }

    @Test
    public void supportsPostProcessing() throws KettleException {
        RowMeta rowMeta = new RowMeta();
        rowMeta.addValueMeta(new ValueMetaString("field"));
        Result result = new Result();
        result.setRows(Arrays.asList(new RowMetaAndData(rowMeta, new Object[]{"queen"}), new RowMetaAndData(rowMeta, new Object[]{"king"})));
        Mockito.when(this.subtransExecutor.execute((List) Matchers.any())).thenReturn(Optional.of(result));
        AtomicInteger atomicInteger = new AtomicInteger();
        new FixedTimeStreamWindow(this.subtransExecutor, rowMeta, 0L, 2, 1, entry -> {
            atomicInteger.set(((List) ((List) entry.getKey()).get(0)).size());
        }).buffer(Flowable.fromIterable(Collections.singletonList(Arrays.asList("v1", "v2")))).forEach(result2 -> {
            Assert.assertEquals(result, result2);
        });
        Assert.assertEquals(2L, atomicInteger.get());
    }

    @Test
    public void emptyResultsNotPostProcessed() throws KettleException {
        RowMeta rowMeta = new RowMeta();
        rowMeta.addValueMeta(new ValueMetaString("field"));
        Result result = new Result();
        result.setRows(Arrays.asList(new RowMetaAndData(rowMeta, new Object[]{"queen"}), new RowMetaAndData(rowMeta, new Object[]{"king"})));
        Mockito.when(this.subtransExecutor.execute((List) Matchers.any())).thenReturn(Optional.empty());
        AtomicInteger atomicInteger = new AtomicInteger();
        new FixedTimeStreamWindow(this.subtransExecutor, rowMeta, 0L, 2, 1, entry -> {
            atomicInteger.set(((List) ((List) entry.getKey()).get(0)).size());
        }).buffer(Flowable.fromIterable(Collections.singletonList(Arrays.asList("v1", "v2")))).forEach(result2 -> {
            Assert.assertEquals(result, result2);
        });
        Assert.assertEquals(0L, atomicInteger.get());
    }
}
