package org.gecko.adapter.eventadmin.tests;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.gecko.adapter.eventadmin.context.EventAdminMessagingContext;
import org.gecko.adapter.eventadmin.context.EventAdminMessagingContextBuilder;
import org.gecko.osgi.messaging.MessagingContext;
import org.gecko.osgi.messaging.MessagingService;
import org.gecko.osgi.messaging.SimpleMessagingContextBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.Extensions;
import org.mockito.junit.jupiter.MockitoExtension;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventHandler;
import org.osgi.test.common.annotation.InjectService;
import org.osgi.test.common.service.ServiceAware;
import org.osgi.test.junit5.context.BundleContextExtension;
import org.osgi.test.junit5.service.ServiceExtension;
import org.osgi.util.pushstream.PushStream;

@Extensions({@ExtendWith({MockitoExtension.class}), @ExtendWith({ServiceExtension.class}), @ExtendWith({BundleContextExtension.class})})
/* loaded from: input_file:org/gecko/adapter/eventadmin/tests/EventAdminMessagingAdapterTest.class */
public class EventAdminMessagingAdapterTest {

    @InjectService
    EventAdmin ea;

    @Test
    public void basicTest(@InjectService ServiceAware<MessagingService> serviceAware) throws Exception {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        Assertions.assertFalse(serviceAware.isEmpty());
        PushStream subscribe = ((MessagingService) serviceAware.getService()).subscribe("test/topic");
        int i = 100;
        CountDownLatch countDownLatch = new CountDownLatch(100);
        subscribe.onError(th -> {
            System.err.println(th.getMessage());
            th.printStackTrace();
        });
        subscribe.adjustBackPressure((message, l) -> {
            System.out.println("bp: " + l);
            return l.longValue();
        }).fork(5, 0, newCachedThreadPool).forEach(message2 -> {
            String str = new String(message2.payload().array());
            Assertions.assertTrue(str.startsWith("test"));
            countDownLatch.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        Executors.newSingleThreadExecutor().execute(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    this.ea.postEvent(new Event("test/topic", Collections.singletonMap("content", ByteBuffer.wrap(("test" + i2).getBytes()))));
                } catch (Exception e) {
                    Assertions.assertNull(e);
                }
            }
        });
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "Not all messages have been prcessed. Current count " + countDownLatch.getCount());
    }

    @Test
    public void basicTestSendViaMessageAdapter(@InjectService ServiceAware<MessagingService> serviceAware) throws Exception {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        Assertions.assertFalse(serviceAware.isEmpty());
        MessagingService messagingService = (MessagingService) serviceAware.getService();
        PushStream subscribe = messagingService.subscribe("test/topic");
        int i = 100;
        CountDownLatch countDownLatch = new CountDownLatch(100);
        subscribe.onError(th -> {
            System.err.println(th.getMessage());
            th.printStackTrace();
        });
        subscribe.adjustBackPressure((message, l) -> {
            System.out.println("bp: " + l);
            return l.longValue();
        }).fork(5, 0, newCachedThreadPool).forEach(message2 -> {
            String str = new String(message2.payload().array());
            Assertions.assertTrue(str.startsWith("test"));
            countDownLatch.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        Executors.newSingleThreadExecutor().execute(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    messagingService.publish("test/topic", ByteBuffer.wrap(("test" + i2).getBytes()));
                } catch (Exception e) {
                    Assertions.assertNull(e);
                }
            }
        });
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "Not all messages have been prcessed. Current count " + countDownLatch.getCount());
    }

    @Test
    public void basicTestSendViaMessageAdapterWithContext(@InjectService ServiceAware<MessagingService> serviceAware) throws Exception {
        Assertions.assertFalse(serviceAware.isEmpty());
        MessagingService messagingService = (MessagingService) serviceAware.getService();
        PushStream subscribe = messagingService.subscribe("test/topic");
        int i = 100;
        CountDownLatch countDownLatch = new CountDownLatch(100);
        subscribe.onError(th -> {
            System.err.println(th.getMessage());
            th.printStackTrace();
        });
        subscribe.forEach(message -> {
            String str = new String(message.payload().array());
            Assertions.assertTrue(str.startsWith("test"));
            Assertions.assertEquals("test", message.getContext().getContentType());
            Assertions.assertTrue(message.getContext() instanceof EventAdminMessagingContext);
            Assertions.assertEquals("testheader", message.getContext().getHeaders().get("testheader"));
            countDownLatch.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        MessagingContext build = EventAdminMessagingContextBuilder.builder().header("testheader", "testheader").contentType("test").build();
        Executors.newSingleThreadExecutor().execute(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    messagingService.publish("test/topic", ByteBuffer.wrap(("test" + i2).getBytes()), build);
                } catch (Exception e) {
                    Assertions.assertNull(e);
                }
            }
        });
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "Not all messages have been prcessed. Current count " + countDownLatch.getCount());
    }

    @Test
    public void basicBackpressure(@InjectService ServiceAware<MessagingService> serviceAware) throws Exception {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        Assertions.assertFalse(serviceAware.isEmpty());
        MessagingService messagingService = (MessagingService) serviceAware.getService();
        PushStream subscribe = messagingService.subscribe("test/topic");
        int i = 2;
        CountDownLatch countDownLatch = new CountDownLatch(2);
        subscribe.onError(th -> {
            System.err.println(th.getMessage());
            th.printStackTrace();
        });
        subscribe.adjustBackPressure((message, l) -> {
            System.out.println("bp: " + l);
            return 5000L;
        }).fork(5, 0, newCachedThreadPool).forEach(message2 -> {
            String str = new String(message2.payload().array());
            Assertions.assertTrue(str.startsWith("test"));
            countDownLatch.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        MessagingContext build = SimpleMessagingContextBuilder.builder().contentType("test").build();
        Executors.newSingleThreadExecutor().execute(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    messagingService.publish("test/topic", ByteBuffer.wrap(("test" + i2).getBytes()), build);
                } catch (Exception e) {
                    Assertions.assertNull(e);
                }
            }
        });
        Assertions.assertTrue(countDownLatch.await(15L, TimeUnit.SECONDS), "Not all messages have been prcessed. Current count " + countDownLatch.getCount());
    }

    @Test
    public void testCleanup(@InjectService ServiceAware<MessagingService> serviceAware, @InjectService(cardinality = 0) ServiceAware<EventHandler> serviceAware2) throws Exception {
        Assertions.assertFalse(serviceAware.isEmpty());
        MessagingService messagingService = (MessagingService) serviceAware.getService();
        PushStream subscribe = messagingService.subscribe("test/topic");
        int i = 100;
        CountDownLatch countDownLatch = new CountDownLatch(100);
        subscribe.onError(th -> {
            System.err.println(th.getMessage());
            th.printStackTrace();
        });
        subscribe.forEach(message -> {
            String str = new String(message.payload().array());
            Assertions.assertTrue(str.startsWith("test"));
            countDownLatch.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        Executors.newSingleThreadExecutor().execute(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    messagingService.publish("test/topic", ByteBuffer.wrap(("test" + i2).getBytes()));
                } catch (Exception e) {
                    Assertions.assertNull(e);
                }
            }
        });
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "Not all messages have been prcessed. Current count " + countDownLatch.getCount());
        Assertions.assertEquals(1, serviceAware2.getTrackingCount());
        subscribe.close();
        Assertions.assertEquals(2, serviceAware2.getTrackingCount());
    }

    @Test
    public void testCleanupMultiple(@InjectService ServiceAware<MessagingService> serviceAware, @InjectService(cardinality = 0) ServiceAware<EventHandler> serviceAware2) throws Exception {
        Assertions.assertFalse(serviceAware.isEmpty());
        MessagingService messagingService = (MessagingService) serviceAware.getService();
        int i = 100;
        Assertions.assertEquals(0, serviceAware2.getTrackingCount());
        PushStream subscribe = messagingService.subscribe("test/topic");
        CountDownLatch countDownLatch = new CountDownLatch(100);
        subscribe.onError(th -> {
            System.err.println(th.getMessage());
            th.printStackTrace();
        });
        subscribe.forEach(message -> {
            String str = new String(message.payload().array());
            Assertions.assertTrue(str.startsWith("test"));
            countDownLatch.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        PushStream subscribe2 = messagingService.subscribe("test/topic");
        CountDownLatch countDownLatch2 = new CountDownLatch(100);
        subscribe2.onError(th2 -> {
            System.err.println(th2.getMessage());
            th2.printStackTrace();
        });
        subscribe2.forEach(message2 -> {
            String str = new String(message2.payload().array());
            Assertions.assertTrue(str.startsWith("test"));
            countDownLatch2.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        PushStream subscribe3 = messagingService.subscribe("test/topic");
        Assertions.assertEquals(1, serviceAware2.getTrackingCount());
        CountDownLatch countDownLatch3 = new CountDownLatch(100);
        subscribe3.onError(th3 -> {
            System.err.println(th3.getMessage());
            th3.printStackTrace();
        });
        subscribe3.forEach(message3 -> {
            String str = new String(message3.payload().array());
            Assertions.assertTrue(str.startsWith("test"));
            countDownLatch3.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        Executors.newSingleThreadExecutor().execute(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    messagingService.publish("test/topic", ByteBuffer.wrap(("test" + i2).getBytes()));
                } catch (Exception e) {
                    Assertions.assertNull(e);
                }
            }
        });
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "Not all messages have been processed. Current count " + countDownLatch.getCount());
        Assertions.assertTrue(countDownLatch2.await(10L, TimeUnit.SECONDS), "Not all messages have been processed. Current count " + countDownLatch2.getCount());
        Assertions.assertTrue(countDownLatch3.await(10L, TimeUnit.SECONDS), "Not all messages have been processed. Current count " + countDownLatch3.getCount());
        subscribe.close();
        subscribe2.close();
        subscribe3.close();
    }
}
