package me.ahoo.wow.tck.command;

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.collections.SetsKt;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.text.Charsets;
import me.ahoo.wow.api.command.CommandMessage;
import me.ahoo.wow.api.messaging.Header;
import me.ahoo.wow.api.modeling.NamedAggregate;
import me.ahoo.wow.command.CommandBus;
import me.ahoo.wow.command.CommandGateway;
import me.ahoo.wow.command.CommandResult;
import me.ahoo.wow.command.DefaultCommandGateway;
import me.ahoo.wow.command.ServerCommandExchange;
import me.ahoo.wow.command.SimpleCommandMessageKt;
import me.ahoo.wow.command.validation.NoOpValidator;
import me.ahoo.wow.command.wait.CommandStage;
import me.ahoo.wow.command.wait.SimpleCommandWaitEndpoint;
import me.ahoo.wow.command.wait.SimpleWaitSignal;
import me.ahoo.wow.command.wait.SimpleWaitStrategyRegistrar;
import me.ahoo.wow.configuration.MetadataSearcherKt;
import me.ahoo.wow.id.GlobalIdGenerator;
import me.ahoo.wow.infra.idempotency.BloomFilterIdempotencyChecker;
import me.ahoo.wow.infra.idempotency.IdempotencyChecker;
import me.ahoo.wow.metrics.Metrics;
import me.ahoo.wow.tck.eventsourcing.MockDomainEventStreamsKt;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kotlin.test.StepVerifierExtensionsKt;
import reactor.test.StepVerifier;

/* compiled from: CommandGatewaySpec.kt */
@Metadata(mv = {1, 8, MockDomainEventStreamsKt.DEFAULT_AGGREGATE_VERSION}, k = 1, xi = 48, d1 = {"��<\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0010\u0002\n\u0002\b\u0006\b&\u0018��2\u00020\u0001B\u0005¢\u0006\u0002\u0010\u0002J\b\u0010\u001b\u001a\u00020\u0004H$J\b\u0010\u001c\u001a\u00020\u001dH\u0007J\b\u0010\u001e\u001a\u00020\u001dH\u0007J\b\u0010\u001f\u001a\u00020\u001dH\u0007J\b\u0010 \u001a\u00020\u001dH\u0007J\b\u0010!\u001a\u00020\u001dH\u0007J\b\u0010\"\u001a\u00020\u001dH\u0007R\u001a\u0010\u0003\u001a\u00020\u0004X\u0084.¢\u0006\u000e\n��\u001a\u0004\b\u0005\u0010\u0006\"\u0004\b\u0007\u0010\bR\u001a\u0010\t\u001a\u00020\nX\u0084.¢\u0006\u000e\n��\u001a\u0004\b\u000b\u0010\f\"\u0004\b\r\u0010\u000eR\u0014\u0010\u000f\u001a\u00020\u0010X\u0084\u0004¢\u0006\b\n��\u001a\u0004\b\u0011\u0010\u0012R\u0014\u0010\u0013\u001a\u00020\u0014X\u0084\u0004¢\u0006\b\n��\u001a\u0004\b\u0015\u0010\u0016R\u0014\u0010\u0017\u001a\u00020\u0018X\u0084\u0004¢\u0006\b\n��\u001a\u0004\b\u0019\u0010\u001a¨\u0006#"}, d2 = {"Lme/ahoo/wow/tck/command/CommandGatewaySpec;", "", "()V", "commandBus", "Lme/ahoo/wow/command/CommandBus;", "getCommandBus", "()Lme/ahoo/wow/command/CommandBus;", "setCommandBus", "(Lme/ahoo/wow/command/CommandBus;)V", "commandGateway", "Lme/ahoo/wow/command/CommandGateway;", "getCommandGateway", "()Lme/ahoo/wow/command/CommandGateway;", "setCommandGateway", "(Lme/ahoo/wow/command/CommandGateway;)V", "idempotencyChecker", "Lme/ahoo/wow/infra/idempotency/IdempotencyChecker;", "getIdempotencyChecker", "()Lme/ahoo/wow/infra/idempotency/IdempotencyChecker;", "namedAggregate", "Lme/ahoo/wow/api/modeling/NamedAggregate;", "getNamedAggregate", "()Lme/ahoo/wow/api/modeling/NamedAggregate;", "waitStrategyRegistrar", "Lme/ahoo/wow/command/wait/SimpleWaitStrategyRegistrar;", "getWaitStrategyRegistrar", "()Lme/ahoo/wow/command/wait/SimpleWaitStrategyRegistrar;", "createCommandBus", "receive", "", "send", "sendGivenDuplicate", "sendGivenTimeout", "sendThenWaitingForAggregate", "setup", "wow-tck"})
/* loaded from: input_file:me/ahoo/wow/tck/command/CommandGatewaySpec.class */
public abstract class CommandGatewaySpec {

    @NotNull
    private final NamedAggregate namedAggregate = MetadataSearcherKt.asRequiredNamedAggregate(MockSendCommand.class);

    @NotNull
    private final SimpleWaitStrategyRegistrar waitStrategyRegistrar = SimpleWaitStrategyRegistrar.INSTANCE;

    @NotNull
    private final IdempotencyChecker idempotencyChecker;
    protected CommandBus commandBus;
    protected CommandGateway commandGateway;

    public CommandGatewaySpec() {
        Duration ofSeconds = Duration.ofSeconds(1L);
        Intrinsics.checkNotNullExpressionValue(ofSeconds, "ofSeconds(1)");
        this.idempotencyChecker = new BloomFilterIdempotencyChecker(ofSeconds, new Function0<BloomFilter<String>>() { // from class: me.ahoo.wow.tck.command.CommandGatewaySpec$idempotencyChecker$1
            @NotNull
            /* renamed from: invoke, reason: merged with bridge method [inline-methods] */
            public final BloomFilter<String> m3invoke() {
                BloomFilter<String> create = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 1000000);
                Intrinsics.checkNotNullExpressionValue(create, "create(Funnels.stringFun…Charsets.UTF_8), 1000000)");
                return create;
            }
        });
    }

    @NotNull
    protected final NamedAggregate getNamedAggregate() {
        return this.namedAggregate;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @NotNull
    public final SimpleWaitStrategyRegistrar getWaitStrategyRegistrar() {
        return this.waitStrategyRegistrar;
    }

    @NotNull
    protected final IdempotencyChecker getIdempotencyChecker() {
        return this.idempotencyChecker;
    }

    @NotNull
    protected final CommandBus getCommandBus() {
        CommandBus commandBus = this.commandBus;
        if (commandBus != null) {
            return commandBus;
        }
        Intrinsics.throwUninitializedPropertyAccessException("commandBus");
        return null;
    }

    protected final void setCommandBus(@NotNull CommandBus commandBus) {
        Intrinsics.checkNotNullParameter(commandBus, "<set-?>");
        this.commandBus = commandBus;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @NotNull
    public final CommandGateway getCommandGateway() {
        CommandGateway commandGateway = this.commandGateway;
        if (commandGateway != null) {
            return commandGateway;
        }
        Intrinsics.throwUninitializedPropertyAccessException("commandGateway");
        return null;
    }

    protected final void setCommandGateway(@NotNull CommandGateway commandGateway) {
        Intrinsics.checkNotNullParameter(commandGateway, "<set-?>");
        this.commandGateway = commandGateway;
    }

    @NotNull
    protected abstract CommandBus createCommandBus();

    @BeforeEach
    public final void setup() {
        setCommandBus(Metrics.INSTANCE.metrizable(createCommandBus()));
        setCommandGateway((CommandGateway) new DefaultCommandGateway(new SimpleCommandWaitEndpoint(""), getCommandBus(), this.idempotencyChecker, this.waitStrategyRegistrar, NoOpValidator.INSTANCE));
    }

    @Test
    public final void send() {
        Schedulers.single().schedule(() -> {
            send$lambda$0(r1);
        });
        CommandGateway commandGateway = getCommandGateway();
        String generateAsString = GlobalIdGenerator.INSTANCE.generateAsString();
        Intrinsics.checkNotNullExpressionValue(generateAsString, "GlobalIdGenerator.generateAsString()");
        StepVerifierExtensionsKt.test(commandGateway.sendAndWaitForSent(SimpleCommandMessageKt.asCommandMessage$default(new MockSendCommand(generateAsString), (String) null, (String) null, (String) null, (String) null, (Integer) null, (NamedAggregate) null, (Header) null, 0L, 255, (Object) null))).expectNextCount(1L).verifyComplete();
    }

    @Test
    public final void sendGivenTimeout() {
        CommandGateway commandGateway = getCommandGateway();
        String generateAsString = GlobalIdGenerator.INSTANCE.generateAsString();
        Intrinsics.checkNotNullExpressionValue(generateAsString, "GlobalIdGenerator.generateAsString()");
        Mono timeout = commandGateway.sendAndWaitForProcessed(SimpleCommandMessageKt.asCommandMessage$default(new MockSendCommand(generateAsString), (String) null, (String) null, (String) null, (String) null, (Integer) null, (NamedAggregate) null, (Header) null, 0L, 255, (Object) null)).timeout(Duration.ofMillis(100L));
        Intrinsics.checkNotNullExpressionValue(timeout, "commandGateway.sendAndWa…t(Duration.ofMillis(100))");
        StepVerifierExtensionsKt.test(timeout).verifyTimeout(Duration.ofMillis(150L));
    }

    @Test
    public final void sendGivenDuplicate() {
        String generateAsString = GlobalIdGenerator.INSTANCE.generateAsString();
        Intrinsics.checkNotNullExpressionValue(generateAsString, "GlobalIdGenerator.generateAsString()");
        CommandMessage asCommandMessage$default = SimpleCommandMessageKt.asCommandMessage$default(new MockSendCommand(generateAsString), (String) null, (String) null, (String) null, (String) null, (Integer) null, (NamedAggregate) null, (Header) null, 0L, 255, (Object) null);
        Schedulers.single().schedule(() -> {
            sendGivenDuplicate$lambda$1(r1);
        });
        StepVerifierExtensionsKt.test(getCommandGateway().sendAndWaitForSent(asCommandMessage$default)).expectNextCount(1L).verifyComplete();
        StepVerifier.FirstStep test = StepVerifierExtensionsKt.test(getCommandGateway().sendAndWaitForSent(asCommandMessage$default));
        CommandGatewaySpec$sendGivenDuplicate$2 commandGatewaySpec$sendGivenDuplicate$2 = new Function1<CommandResult, Unit>() { // from class: me.ahoo.wow.tck.command.CommandGatewaySpec$sendGivenDuplicate$2
            public final void invoke(CommandResult commandResult) {
                MatcherAssert.assertThat(commandResult.getStage(), Matchers.equalTo(CommandStage.SENT));
                MatcherAssert.assertThat(commandResult.getErrorCode(), Matchers.equalTo("WOW-CMD-409"));
            }

            public /* bridge */ /* synthetic */ Object invoke(Object obj) {
                invoke((CommandResult) obj);
                return Unit.INSTANCE;
            }
        };
        test.consumeNextWith((v1) -> {
            sendGivenDuplicate$lambda$2(r1, v1);
        }).verifyComplete();
    }

    @Test
    public final void sendThenWaitingForAggregate() {
        Schedulers.single().schedule(() -> {
            sendThenWaitingForAggregate$lambda$4(r1);
        });
        CommandGateway commandGateway = getCommandGateway();
        String generateAsString = GlobalIdGenerator.INSTANCE.generateAsString();
        Intrinsics.checkNotNullExpressionValue(generateAsString, "GlobalIdGenerator.generateAsString()");
        StepVerifierExtensionsKt.test(commandGateway.sendAndWaitForProcessed(SimpleCommandMessageKt.asCommandMessage$default(new MockSendCommand(generateAsString), (String) null, (String) null, (String) null, (String) null, (Integer) null, (NamedAggregate) null, (Header) null, 0L, 255, (Object) null))).expectNextCount(1L).verifyComplete();
    }

    @Test
    public final void receive() {
        StepVerifier.FirstStep test = StepVerifierExtensionsKt.test(getCommandGateway().receive(SetsKt.setOf(this.namedAggregate)));
        Function1<Subscription, Unit> function1 = new Function1<Subscription, Unit>() { // from class: me.ahoo.wow.tck.command.CommandGatewaySpec$receive$1
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(1);
            }

            public final void invoke(Subscription subscription) {
                Scheduler single = Schedulers.single();
                CommandGatewaySpec commandGatewaySpec = CommandGatewaySpec.this;
                single.schedule(() -> {
                    invoke$lambda$1(r1);
                }, 10L, TimeUnit.MILLISECONDS);
            }

            private static final Publisher invoke$lambda$1$lambda$0(Function1 function12, Object obj) {
                Intrinsics.checkNotNullParameter(function12, "$tmp0");
                return (Publisher) function12.invoke(obj);
            }

            private static final void invoke$lambda$1(final CommandGatewaySpec commandGatewaySpec) {
                Intrinsics.checkNotNullParameter(commandGatewaySpec, "this$0");
                Flux range = Flux.range(0, 10);
                Function1<Integer, Publisher<? extends CommandResult>> function12 = new Function1<Integer, Publisher<? extends CommandResult>>() { // from class: me.ahoo.wow.tck.command.CommandGatewaySpec$receive$1$1$1
                    /* JADX INFO: Access modifiers changed from: package-private */
                    {
                        super(1);
                    }

                    public final Publisher<? extends CommandResult> invoke(Integer num) {
                        CommandGateway commandGateway = CommandGatewaySpec.this.getCommandGateway();
                        String generateAsString = GlobalIdGenerator.INSTANCE.generateAsString();
                        Intrinsics.checkNotNullExpressionValue(generateAsString, "GlobalIdGenerator.generateAsString()");
                        return commandGateway.sendAndWaitForSent(SimpleCommandMessageKt.asCommandMessage$default(new MockSendCommand(generateAsString), (String) null, (String) null, (String) null, (String) null, (Integer) null, (NamedAggregate) null, (Header) null, 0L, 255, (Object) null));
                    }
                };
                range.flatMap((v1) -> {
                    return invoke$lambda$1$lambda$0(r1, v1);
                }).subscribe();
            }

            public /* bridge */ /* synthetic */ Object invoke(Object obj) {
                invoke((Subscription) obj);
                return Unit.INSTANCE;
            }
        };
        test.consumeSubscriptionWith((v1) -> {
            receive$lambda$5(r1, v1);
        }).expectNextCount(10L).verifyTimeout(Duration.ofSeconds(2L));
    }

    private static final void send$lambda$0(CommandGatewaySpec commandGatewaySpec) {
        Intrinsics.checkNotNullParameter(commandGatewaySpec, "this$0");
        commandGatewaySpec.getCommandGateway().receive(SetsKt.setOf(commandGatewaySpec.namedAggregate)).subscribe();
    }

    private static final void sendGivenDuplicate$lambda$1(CommandGatewaySpec commandGatewaySpec) {
        Intrinsics.checkNotNullParameter(commandGatewaySpec, "this$0");
        commandGatewaySpec.getCommandBus().receive(SetsKt.setOf(commandGatewaySpec.namedAggregate)).subscribe();
    }

    private static final void sendGivenDuplicate$lambda$2(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        function1.invoke(obj);
    }

    private static final void sendThenWaitingForAggregate$lambda$4$lambda$3(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        function1.invoke(obj);
    }

    private static final void sendThenWaitingForAggregate$lambda$4(final CommandGatewaySpec commandGatewaySpec) {
        Intrinsics.checkNotNullParameter(commandGatewaySpec, "this$0");
        Flux receive = commandGatewaySpec.getCommandGateway().receive(SetsKt.setOf(commandGatewaySpec.namedAggregate));
        Function1<ServerCommandExchange<Object>, Unit> function1 = new Function1<ServerCommandExchange<Object>, Unit>() { // from class: me.ahoo.wow.tck.command.CommandGatewaySpec$sendThenWaitingForAggregate$1$1
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(1);
            }

            public final void invoke(ServerCommandExchange<Object> serverCommandExchange) {
                Scheduler boundedElastic = Schedulers.boundedElastic();
                CommandGatewaySpec commandGatewaySpec2 = CommandGatewaySpec.this;
                boundedElastic.schedule(() -> {
                    invoke$lambda$0(r1, r2);
                }, 10L, TimeUnit.MILLISECONDS);
            }

            private static final void invoke$lambda$0(CommandGatewaySpec commandGatewaySpec2, ServerCommandExchange serverCommandExchange) {
                Intrinsics.checkNotNullParameter(commandGatewaySpec2, "this$0");
                commandGatewaySpec2.getWaitStrategyRegistrar().next(new SimpleWaitSignal(serverCommandExchange.getMessage().getCommandId(), CommandStage.PROCESSED, false, (String) null, (String) null, 28, (DefaultConstructorMarker) null));
            }

            public /* bridge */ /* synthetic */ Object invoke(Object obj) {
                invoke((ServerCommandExchange<Object>) obj);
                return Unit.INSTANCE;
            }
        };
        receive.doOnNext((v1) -> {
            sendThenWaitingForAggregate$lambda$4$lambda$3(r1, v1);
        }).subscribe();
    }

    private static final void receive$lambda$5(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        function1.invoke(obj);
    }
}
