package org.gecko.adapter.mqtt.tests;

import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.gecko.adapter.mqtt.MQTTContextBuilder;
import org.gecko.adapter.mqtt.QoS;
import org.gecko.moquette.broker.MQTTBroker;
import org.gecko.osgi.messaging.MessagingContext;
import org.gecko.osgi.messaging.MessagingService;
import org.gecko.osgi.messaging.SimpleMessagingContextBuilder;
import org.gecko.osgi.messaging.annotations.RequireMQTTv3;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.Extensions;
import org.mockito.junit.jupiter.MockitoExtension;
import org.osgi.service.cm.annotations.RequireConfigurationAdmin;
import org.osgi.test.common.annotation.InjectService;
import org.osgi.test.common.annotation.Property;
import org.osgi.test.common.annotation.config.WithFactoryConfiguration;
import org.osgi.test.common.annotation.config.WithFactoryConfigurations;
import org.osgi.test.common.service.ServiceAware;
import org.osgi.test.junit5.cm.ConfigurationExtension;
import org.osgi.test.junit5.context.BundleContextExtension;
import org.osgi.test.junit5.service.ServiceExtension;

@RequireMQTTv3
@Extensions({@ExtendWith({MockitoExtension.class}), @ExtendWith({ServiceExtension.class}), @ExtendWith({ConfigurationExtension.class}), @ExtendWith({BundleContextExtension.class})})
@WithFactoryConfiguration(factoryPid = "MQTTBroker", location = "?", name = "broker", properties = {@Property(key = "HOST", value = {"localhost"}), @Property(key = "PORT", value = {"2183"})})
@RequireConfigurationAdmin
/* loaded from: input_file:org/gecko/adapter/mqtt/tests/MqttComponentRetainedTest.class */
public class MqttComponentRetainedTest {
    private static final Logger LOGGER = Logger.getLogger(MqttComponentRetainedTest.class.getName());
    private static final int TOPIC_COUNT = 4;
    private static final int MESSAGE_COUNT = 2500;
    private static final String TOPIC = "test.candelete";
    private static final String BROKER_URL = "tcp://localhost:2183";

    @WithFactoryConfigurations({@WithFactoryConfiguration(factoryPid = "MQTTService", location = "?", name = "read", properties = {@Property(key = "username", value = {"demo"}), @Property(key = ".password", value = {"1234"}), @Property(key = "brokerUrl", value = {BROKER_URL})}), @WithFactoryConfiguration(factoryPid = "MQTTService", location = "?", name = "write", properties = {@Property(key = "username", value = {"demo"}), @Property(key = ".password", value = {"1234"}), @Property(key = "brokerUrl", value = {BROKER_URL})})})
    @Test
    public void testForward(@InjectService(cardinality = 0) MQTTBroker mQTTBroker, @InjectService(cardinality = 0) ServiceAware<MessagingService> serviceAware, @InjectService(cardinality = 0) ServiceAware<MessagingService> serviceAware2) throws Exception {
        MessagingService messagingService = (MessagingService) serviceAware2.waitForService(10000L);
        for (int i = 0; i < TOPIC_COUNT; i++) {
            publish(messagingService, "test.candelete" + i + "/");
        }
        MessagingService messagingService2 = (MessagingService) serviceAware.waitForService(10000L);
        MessagingContext build = SimpleMessagingContextBuilder.builder().withBuffer(250000).build();
        CountDownLatch countDownLatch = new CountDownLatch(10000);
        for (int i2 = 0; i2 < TOPIC_COUNT; i2++) {
            sub(messagingService2, build, "test.candelete" + i2 + "/#", countDownLatch);
        }
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "Missing " + countDownLatch.getCount() + " messages.");
    }

    private void publish(MessagingService messagingService, String str) throws Exception {
        MessagingContext build = new MQTTContextBuilder().retained().withQoS(QoS.AT_LEAST_ONE).build();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            messagingService.publish(str + "123456789012345678901234567890-" + i, ByteBuffer.wrap(("123456789-" + i).getBytes()), build);
            if (i % 100 == 0) {
                Thread.sleep(200L);
            }
        }
    }

    private void sub(MessagingService messagingService, MessagingContext messagingContext, String str, CountDownLatch countDownLatch) throws Exception {
        messagingService.subscribe(str, messagingContext).forEach(message -> {
            LOGGER.log(Level.INFO, message.topic());
            countDownLatch.countDown();
            try {
                messagingService.publish("forward/" + message.topic(), message.payload());
            } catch (Exception e) {
                LOGGER.log(Level.SEVERE, e.getMessage(), (Throwable) e);
            }
        });
    }
}
