package me.ahoo.wow.tck.messaging;

import java.time.Duration;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.collections.SetsKt;
import kotlin.jdk7.AutoCloseableKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import me.ahoo.wow.api.Identifier;
import me.ahoo.wow.api.messaging.Message;
import me.ahoo.wow.api.messaging.TopicKindCapable;
import me.ahoo.wow.api.modeling.NamedAggregate;
import me.ahoo.wow.id.GlobalIdGenerator;
import me.ahoo.wow.infra.Decorator;
import me.ahoo.wow.messaging.MessageBus;
import me.ahoo.wow.messaging.ReceiverGroupKt;
import me.ahoo.wow.messaging.handler.MessageExchange;
import me.ahoo.wow.metrics.Metrics;
import me.ahoo.wow.tck.event.MockDomainEventStreamsKt;
import me.ahoo.wow.tck.eventsourcing.EventStoreSpec;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.kotlin.test.StepVerifierExtensionsKt;
import reactor.test.StepVerifier;

/* compiled from: MessageBusSpec.kt */
@Metadata(mv = {EventStoreSpec.DEFAULT_PARALLELISM, MockDomainEventStreamsKt.DEFAULT_AGGREGATE_VERSION, MockDomainEventStreamsKt.DEFAULT_AGGREGATE_VERSION}, k = 1, xi = 48, d1 = {"��X\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0007\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\b\n\u0002\b\u0004\b&\u0018�� &*\u0010\b��\u0010\u0001*\n\u0012\u0002\b\u0003\u0012\u0002\b\u00030\u0002*\u0012\b\u0001\u0010\u0003*\f\u0012\u0002\b\u0003\u0012\u0004\u0012\u0002H\u00010\u0004*\u0014\b\u0002\u0010\u0005*\u000e\u0012\u0004\u0012\u0002H\u0001\u0012\u0004\u0012\u0002H\u00030\u00062\u00020\u0007:\u0001&B\u0007¢\u0006\u0004\b\b\u0010\tJ\r\u0010\u000e\u001a\u00028\u0002H$¢\u0006\u0002\u0010\u000fJ\r\u0010\u0010\u001a\u00028��H$¢\u0006\u0002\u0010\u0011J&\u0010\u0012\u001a\b\u0012\u0004\u0012\u00028\u00010\u0013*\b\u0012\u0004\u0012\u00028\u00010\u00132\f\u0010\u0014\u001a\b\u0012\u0004\u0012\u00020\u00160\u0015H\u0014J!\u0010\u0017\u001a\u00020\u00182\u0017\u0010\u0019\u001a\u0013\u0012\u0004\u0012\u00028\u0002\u0012\u0004\u0012\u00020\u00180\u001a¢\u0006\u0002\b\u001bH\u0016J\b\u0010\u001c\u001a\u00020\u0018H\u0007J\b\u0010\u001d\u001a\u00020\u0018H\u0007J\b\u0010\u001e\u001a\u00020\u0018H\u0007J%\u0010\u001f\u001a\b\u0012\u0004\u0012\u00020\u00160 2\u0006\u0010!\u001a\u00028\u00022\b\b\u0002\u0010\"\u001a\u00020#H\u0002¢\u0006\u0002\u0010$J\b\u0010%\u001a\u00020\u0018H\u0007R\u0012\u0010\n\u001a\u00020\u000bX¦\u0004¢\u0006\u0006\u001a\u0004\b\f\u0010\r¨\u0006'"}, d2 = {"Lme/ahoo/wow/tck/messaging/MessageBusSpec;", "M", "Lme/ahoo/wow/api/messaging/Message;", "E", "Lme/ahoo/wow/messaging/handler/MessageExchange;", "BUS", "Lme/ahoo/wow/messaging/MessageBus;", "Lme/ahoo/wow/api/messaging/TopicKindCapable;", "<init>", "()V", "namedAggregate", "Lme/ahoo/wow/api/modeling/NamedAggregate;", "getNamedAggregate", "()Lme/ahoo/wow/api/modeling/NamedAggregate;", "createMessageBus", "()Lme/ahoo/wow/messaging/MessageBus;", "createMessage", "()Lme/ahoo/wow/api/messaging/Message;", "onReceive", "Lreactor/core/publisher/Flux;", "onReady", "Lreactor/core/publisher/Sinks$Empty;", "Ljava/lang/Void;", "verify", "", "block", "Lkotlin/Function1;", "Lkotlin/ExtensionFunctionType;", "send", "receive", "sendPerformance", "sendLoop", "Lreactor/core/publisher/Mono;", "messageBus", "maxCount", "", "(Lme/ahoo/wow/messaging/MessageBus;I)Lreactor/core/publisher/Mono;", "receivePerformance", "Companion", "wow-tck"})
/* loaded from: input_file:me/ahoo/wow/tck/messaging/MessageBusSpec.class */
public abstract class MessageBusSpec<M extends Message<?, ?>, E extends MessageExchange<?, ? extends M>, BUS extends MessageBus<M, E>> implements TopicKindCapable {

    @NotNull
    public static final Companion Companion = new Companion(null);
    private static final Logger log = LoggerFactory.getLogger(MessageBusSpec.class);

    /* compiled from: MessageBusSpec.kt */
    @Metadata(mv = {EventStoreSpec.DEFAULT_PARALLELISM, MockDomainEventStreamsKt.DEFAULT_AGGREGATE_VERSION, MockDomainEventStreamsKt.DEFAULT_AGGREGATE_VERSION}, k = 1, xi = 48, d1 = {"��\u0014\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0003\b\u0086\u0003\u0018��2\u00020\u0001B\t\b\u0002¢\u0006\u0004\b\u0002\u0010\u0003R\u0018\u0010\u0004\u001a\n \u0006*\u0004\u0018\u00010\u00050\u0005X\u0082\u0004¢\u0006\u0004\n\u0002\u0010\u0007¨\u0006\b"}, d2 = {"Lme/ahoo/wow/tck/messaging/MessageBusSpec$Companion;", "", "<init>", "()V", "log", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "Lorg/slf4j/Logger;", "wow-tck"})
    /* loaded from: input_file:me/ahoo/wow/tck/messaging/MessageBusSpec$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    @NotNull
    public abstract NamedAggregate getNamedAggregate();

    @NotNull
    protected abstract BUS createMessageBus();

    @NotNull
    protected abstract M createMessage();

    @NotNull
    protected Flux<E> onReceive(@NotNull Flux<E> flux, @NotNull Sinks.Empty<Void> empty) {
        Intrinsics.checkNotNullParameter(flux, "<this>");
        Intrinsics.checkNotNullParameter(empty, "onReady");
        empty.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
        return flux;
    }

    public void verify(@NotNull Function1<? super BUS, Unit> function1) {
        Intrinsics.checkNotNullParameter(function1, "block");
        MessageBus messageBus = (AutoCloseable) Metrics.INSTANCE.metrizable(createMessageBus());
        Throwable th = null;
        try {
            try {
                MessageBus messageBus2 = messageBus;
                if (Decorator.Companion.getOriginalDelegate(messageBus2) instanceof TopicKindCapable) {
                    Object originalDelegate = Decorator.Companion.getOriginalDelegate(messageBus2);
                    Intrinsics.checkNotNull(originalDelegate, "null cannot be cast to non-null type me.ahoo.wow.api.messaging.TopicKindCapable");
                    MatcherAssert.assertThat(((TopicKindCapable) originalDelegate).getTopicKind(), Matchers.equalTo(getTopicKind()));
                }
                function1.invoke(messageBus2);
                Unit unit = Unit.INSTANCE;
                AutoCloseableKt.closeFinally(messageBus, (Throwable) null);
            } finally {
            }
        } catch (Throwable th2) {
            AutoCloseableKt.closeFinally(messageBus, th);
            throw th2;
        }
    }

    @Test
    public final void send() {
        verify((v1) -> {
            return send$lambda$5(r1, v1);
        });
    }

    @Test
    public final void receive() {
        verify((v1) -> {
            return receive$lambda$10(r1, v1);
        });
    }

    @Test
    public final void sendPerformance() {
        verify((v1) -> {
            return sendPerformance$lambda$13(r1, v1);
        });
    }

    private final Mono<Void> sendLoop(BUS bus, int i) {
        Flux range = Flux.range(0, i);
        Function1 function1 = (v2) -> {
            return sendLoop$lambda$14(r1, r2, v2);
        };
        Mono<Void> then = range.flatMap((v1) -> {
            return sendLoop$lambda$15(r1, v1);
        }).then();
        Intrinsics.checkNotNullExpressionValue(then, "then(...)");
        return then;
    }

    static /* synthetic */ Mono sendLoop$default(MessageBusSpec messageBusSpec, MessageBus messageBus, int i, int i2, Object obj) {
        if (obj != null) {
            throw new UnsupportedOperationException("Super calls with default arguments not supported in this target, function: sendLoop");
        }
        if ((i2 & 2) != 0) {
            i = 1000;
        }
        return messageBusSpec.sendLoop(messageBus, i);
    }

    @Test
    @DisabledIfEnvironmentVariable(named = "CI", matches = ".*")
    public final void receivePerformance() {
        verify((v1) -> {
            return receivePerformance$lambda$18(r1, v1);
        });
    }

    private static final Unit send$lambda$5$lambda$1(Sinks.Empty empty, MessageBus messageBus, Message message, Subscription subscription) {
        empty.asMono().then(messageBus.send(message)).delaySubscription(Duration.ofMillis(1000L)).subscribe();
        return Unit.INSTANCE;
    }

    private static final void send$lambda$5$lambda$2(Function1 function1, Object obj) {
        function1.invoke(obj);
    }

    private static final Unit send$lambda$5$lambda$3(Message message, MessageExchange messageExchange) {
        MatcherAssert.assertThat(messageExchange.getMessage().getId(), Matchers.equalTo(((Identifier) message).getId()));
        return Unit.INSTANCE;
    }

    private static final void send$lambda$5$lambda$4(Function1 function1, Object obj) {
        function1.invoke(obj);
    }

    private static final Unit send$lambda$5(MessageBusSpec messageBusSpec, MessageBus messageBus) {
        Intrinsics.checkNotNullParameter(messageBus, "$this$verify");
        Sinks.Empty<Void> empty = Sinks.empty();
        Intrinsics.checkNotNullExpressionValue(empty, "empty(...)");
        Message createMessage = messageBusSpec.createMessage();
        Flux receive = messageBus.receive(SetsKt.setOf(messageBusSpec.getNamedAggregate()));
        String generateAsString = GlobalIdGenerator.INSTANCE.generateAsString();
        Intrinsics.checkNotNullExpressionValue(generateAsString, "generateAsString(...)");
        Flux<E> onReceive = messageBusSpec.onReceive(ReceiverGroupKt.writeReceiverGroup(receive, generateAsString), empty);
        Function1 function1 = (v3) -> {
            return send$lambda$5$lambda$1(r1, r2, r3, v3);
        };
        Flux doOnSubscribe = onReceive.doOnSubscribe((v1) -> {
            send$lambda$5$lambda$2(r1, v1);
        });
        Intrinsics.checkNotNullExpressionValue(doOnSubscribe, "doOnSubscribe(...)");
        StepVerifier.FirstStep test = StepVerifierExtensionsKt.test(doOnSubscribe);
        Function1 function12 = (v1) -> {
            return send$lambda$5$lambda$3(r1, v1);
        };
        test.consumeNextWith((v1) -> {
            send$lambda$5$lambda$4(r1, v1);
        }).thenCancel().verify();
        return Unit.INSTANCE;
    }

    private static final Publisher receive$lambda$10$lambda$8$lambda$6(MessageBusSpec messageBusSpec, MessageBus messageBus, Integer num) {
        return messageBus.send(messageBusSpec.createMessage());
    }

    private static final Publisher receive$lambda$10$lambda$8$lambda$7(Function1 function1, Object obj) {
        return (Publisher) function1.invoke(obj);
    }

    private static final Unit receive$lambda$10$lambda$8(Sinks.Empty empty, MessageBusSpec messageBusSpec, MessageBus messageBus, Subscription subscription) {
        Flux range = Flux.range(0, 10);
        Function1 function1 = (v2) -> {
            return receive$lambda$10$lambda$8$lambda$6(r1, r2, v2);
        };
        Publisher flatMap = range.flatMap((v1) -> {
            return receive$lambda$10$lambda$8$lambda$7(r1, v1);
        });
        Intrinsics.checkNotNullExpressionValue(flatMap, "flatMap(...)");
        empty.asMono().thenMany(flatMap).delaySubscription(Duration.ofMillis(1000L)).subscribe();
        return Unit.INSTANCE;
    }

    private static final void receive$lambda$10$lambda$9(Function1 function1, Object obj) {
        function1.invoke(obj);
    }

    private static final Unit receive$lambda$10(MessageBusSpec messageBusSpec, MessageBus messageBus) {
        Intrinsics.checkNotNullParameter(messageBus, "$this$verify");
        Sinks.Empty<Void> empty = Sinks.empty();
        Intrinsics.checkNotNullExpressionValue(empty, "empty(...)");
        Flux receive = messageBus.receive(SetsKt.setOf(messageBusSpec.getNamedAggregate()));
        String generateAsString = GlobalIdGenerator.INSTANCE.generateAsString();
        Intrinsics.checkNotNullExpressionValue(generateAsString, "generateAsString(...)");
        Flux<E> onReceive = messageBusSpec.onReceive(ReceiverGroupKt.writeReceiverGroup(receive, generateAsString), empty);
        Function1 function1 = (v3) -> {
            return receive$lambda$10$lambda$8(r1, r2, r3, v3);
        };
        Flux doOnSubscribe = onReceive.doOnSubscribe((v1) -> {
            receive$lambda$10$lambda$9(r1, v1);
        });
        Intrinsics.checkNotNullExpressionValue(doOnSubscribe, "doOnSubscribe(...)");
        StepVerifierExtensionsKt.test(doOnSubscribe).expectNextCount(10L).thenCancel().verify();
        return Unit.INSTANCE;
    }

    private static final Unit sendPerformance$lambda$13$lambda$11(MessageBusSpec messageBusSpec, MessageBus messageBus, Subscription subscription) {
        Duration verifyComplete = StepVerifierExtensionsKt.test(sendLoop$default(messageBusSpec, messageBus, 0, 2, null)).verifyComplete();
        Intrinsics.checkNotNullExpressionValue(verifyComplete, "verifyComplete(...)");
        log.info("[" + messageBus.getClass().getSimpleName() + "] sendPerformance - duration:{}", verifyComplete);
        return Unit.INSTANCE;
    }

    private static final void sendPerformance$lambda$13$lambda$12(Function1 function1, Object obj) {
        function1.invoke(obj);
    }

    private static final Unit sendPerformance$lambda$13(MessageBusSpec messageBusSpec, MessageBus messageBus) {
        Intrinsics.checkNotNullParameter(messageBus, "$this$verify");
        Sinks.Empty<Void> empty = Sinks.empty();
        Intrinsics.checkNotNullExpressionValue(empty, "empty(...)");
        Flux receive = messageBus.receive(SetsKt.setOf(messageBusSpec.getNamedAggregate()));
        String generateAsString = GlobalIdGenerator.INSTANCE.generateAsString();
        Intrinsics.checkNotNullExpressionValue(generateAsString, "generateAsString(...)");
        Flux<E> onReceive = messageBusSpec.onReceive(ReceiverGroupKt.writeReceiverGroup(receive, generateAsString), empty);
        Function1 function1 = (v2) -> {
            return sendPerformance$lambda$13$lambda$11(r1, r2, v2);
        };
        Flux doOnSubscribe = onReceive.doOnSubscribe((v1) -> {
            sendPerformance$lambda$13$lambda$12(r1, v1);
        });
        Intrinsics.checkNotNullExpressionValue(doOnSubscribe, "doOnSubscribe(...)");
        StepVerifierExtensionsKt.test(doOnSubscribe).thenCancel().verify();
        return Unit.INSTANCE;
    }

    private static final Publisher sendLoop$lambda$14(MessageBusSpec messageBusSpec, MessageBus messageBus, Integer num) {
        return messageBus.send(messageBusSpec.createMessage());
    }

    private static final Publisher sendLoop$lambda$15(Function1 function1, Object obj) {
        return (Publisher) function1.invoke(obj);
    }

    private static final Unit receivePerformance$lambda$18$lambda$16(MessageBusSpec messageBusSpec, MessageBus messageBus, long j, Sinks.Empty empty, Subscription subscription) {
        empty.asMono().thenMany(messageBusSpec.sendLoop(messageBus, (int) j)).delaySubscription(Duration.ofMillis(1000L)).subscribe();
        return Unit.INSTANCE;
    }

    private static final void receivePerformance$lambda$18$lambda$17(Function1 function1, Object obj) {
        function1.invoke(obj);
    }

    private static final Unit receivePerformance$lambda$18(MessageBusSpec messageBusSpec, MessageBus messageBus) {
        Intrinsics.checkNotNullParameter(messageBus, "$this$verify");
        long j = 1000;
        Sinks.Empty<Void> empty = Sinks.empty();
        Intrinsics.checkNotNullExpressionValue(empty, "empty(...)");
        Flux receive = messageBus.receive(SetsKt.setOf(messageBusSpec.getNamedAggregate()));
        String generateAsString = GlobalIdGenerator.INSTANCE.generateAsString();
        Intrinsics.checkNotNullExpressionValue(generateAsString, "generateAsString(...)");
        Flux<E> onReceive = messageBusSpec.onReceive(ReceiverGroupKt.writeReceiverGroup(receive, generateAsString), empty);
        Function1 function1 = (v4) -> {
            return receivePerformance$lambda$18$lambda$16(r1, r2, r3, r4, v4);
        };
        Flux doOnSubscribe = onReceive.doOnSubscribe((v1) -> {
            receivePerformance$lambda$18$lambda$17(r1, v1);
        });
        Intrinsics.checkNotNullExpressionValue(doOnSubscribe, "doOnSubscribe(...)");
        Duration verify = StepVerifierExtensionsKt.test(doOnSubscribe).expectNextCount(1000L).thenCancel().verify();
        Intrinsics.checkNotNullExpressionValue(verify, "verify(...)");
        log.info("[" + messageBus.getClass().getSimpleName() + "] receivePerformance - duration:{}", verify);
        return Unit.INSTANCE;
    }
}
