package kalix.javasdk.testkit.impl;

import akka.NotUsed;
import akka.NotUsed$;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props$;
import akka.stream.BoundedSourceQueue;
import akka.stream.Materializer$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import akka.testkit.TestProbe$;
import java.util.concurrent.ConcurrentHashMap;
import kalix.eventing.EventDestination;
import kalix.eventing.EventSource;
import kalix.javasdk.impl.MessageCodec;
import kalix.javasdk.testkit.EventingTestKit;
import kalix.javasdk.testkit.impl.EventingTestKitImpl;
import kalix.testkit.protocol.eventing_test_backend.EmitSingleCommand;
import kalix.testkit.protocol.eventing_test_backend.EmitSingleResult;
import kalix.testkit.protocol.eventing_test_backend.EmitSingleResult$;
import kalix.testkit.protocol.eventing_test_backend.EventStreamOutCommand;
import kalix.testkit.protocol.eventing_test_backend.EventStreamOutResult;
import kalix.testkit.protocol.eventing_test_backend.EventingTestKitService;
import kalix.testkit.protocol.eventing_test_backend.RunSourceCommand;
import kalix.testkit.protocol.eventing_test_backend.RunSourceCreate;
import kalix.testkit.protocol.eventing_test_backend.SourceElem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: EventingTestKitImpl.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005]f\u0001\u0002\r\u001a\u0005\tB\u0001b\f\u0001\u0003\u0002\u0003\u0006I\u0001\r\u0005\tq\u0001\u0011)\u0019!C\u0001s!Aq\t\u0001B\u0001B\u0003%!\b\u0003\u0005I\u0001\t\u0005\r\u0011\"\u0001J\u0011!q\u0005A!a\u0001\n\u0003y\u0005\u0002C+\u0001\u0005\u0003\u0005\u000b\u0015\u0002&\t\u0011Y\u0003!\u0011!Q\u0001\n]CQ\u0001\u0018\u0001\u0005\u0002uCq\u0001\u001a\u0001C\u0002\u0013%Q\r\u0003\u0004o\u0001\u0001\u0006IA\u001a\u0005\b_\u0002\u0011\r\u0011b\u0003q\u0011\u0019\t\b\u0001)A\u0005a!9!\u000f\u0001b\u0001\n\u0017\u0019\bB\u0002>\u0001A\u0003%A\u000fC\u0004|\u0001\t\u0007I\u0011\u0002?\t\u000f\u0005=\u0001\u0001)A\u0005{\"9\u0011\u0011\u0003\u0001\u0005B\u0005M\u0001bBA\u001a\u0001\u0011%\u0011Q\u0007\u0004\u0007\u0003s\u0001!!a\u000f\t\rq\u001bB\u0011AA+\u0011\u001d\tYf\u0005C!\u0003;Bq!!\u001e\u0014\t\u0003\n9\bC\u0004\u0002\"N!\t%a)\u0003/\u00153XM\u001c;j]\u001e$Vm\u001d;TKJ4\u0018nY3J[Bd'B\u0001\u000e\u001c\u0003\u0011IW\u000e\u001d7\u000b\u0005qi\u0012a\u0002;fgR\\\u0017\u000e\u001e\u0006\u0003=}\tqA[1wCN$7NC\u0001!\u0003\u0015Y\u0017\r\\5y\u0007\u0001\u00192\u0001A\u0012,!\t!\u0013&D\u0001&\u0015\t1s%\u0001\u0003mC:<'\"\u0001\u0015\u0002\t)\fg/Y\u0005\u0003U\u0015\u0012aa\u00142kK\u000e$\bC\u0001\u0017.\u001b\u0005Y\u0012B\u0001\u0018\u001c\u0005=)e/\u001a8uS:<G+Z:u\u0017&$\u0018AB:zgR,W\u000e\u0005\u00022m5\t!G\u0003\u00024i\u0005)\u0011m\u0019;pe*\tQ'\u0001\u0003bW.\f\u0017BA\u001c3\u0005-\t5\r^8s'f\u001cH/Z7\u0002\t!|7\u000f^\u000b\u0002uA\u00111\b\u0012\b\u0003y\t\u0003\"!\u0010!\u000e\u0003yR!aP\u0011\u0002\rq\u0012xn\u001c;?\u0015\u0005\t\u0015!B:dC2\f\u0017BA\"A\u0003\u0019\u0001&/\u001a3fM&\u0011QI\u0012\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005\r\u0003\u0015!\u00025pgR\u0004\u0013\u0001\u00029peR,\u0012A\u0013\t\u0003\u00172k\u0011\u0001Q\u0005\u0003\u001b\u0002\u00131!\u00138u\u0003!\u0001xN\u001d;`I\u0015\fHC\u0001)T!\tY\u0015+\u0003\u0002S\u0001\n!QK\\5u\u0011\u001d!V!!AA\u0002)\u000b1\u0001\u001f\u00132\u0003\u0015\u0001xN\u001d;!\u0003\u0015\u0019w\u000eZ3d!\tA&,D\u0001Z\u0015\tQR$\u0003\u0002\\3\naQ*Z:tC\u001e,7i\u001c3fG\u00061A(\u001b8jiz\"RA\u00181bE\u000e\u0004\"a\u0018\u0001\u000e\u0003eAQa\f\u0005A\u0002ABQ\u0001\u000f\u0005A\u0002iBQ\u0001\u0013\u0005A\u0002)CQA\u0016\u0005A\u0002]\u000b1\u0001\\8h+\u00051\u0007CA4m\u001b\u0005A'BA5k\u0003\u0015\u0019HN\u001a\u001bk\u0015\u0005Y\u0017aA8sO&\u0011Q\u000e\u001b\u0002\u0007\u0019><w-\u001a:\u0002\t1|w\rI\u0001\u0004gf\u001cX#\u0001\u0019\u0002\tML8\u000fI\u0001\u0003K\u000e,\u0012\u0001\u001e\t\u0003kbl\u0011A\u001e\u0006\u0003o\u0002\u000b!bY8oGV\u0014(/\u001a8u\u0013\tIhO\u0001\rFq\u0016\u001cW\u000f^5p]\u000e{g\u000e^3yi\u0016CXmY;u_J\f1!Z2!\u0003\u0019!x\u000e]5dgV\tQ\u0010\u0005\u0004\u007f\u0003\u000bQ\u0014\u0011B\u0007\u0002\u007f*\u0019q/!\u0001\u000b\u0007\u0005\rq%\u0001\u0003vi&d\u0017bAA\u0004\u007f\n\t2i\u001c8dkJ\u0014XM\u001c;ICNDW*\u00199\u0011\u0007}\u000bY!C\u0002\u0002\u000ee\u0011\u0011\u0002V8qS\u000eLU\u000e\u001d7\u0002\u000fQ|\u0007/[2tA\u0005Aq-\u001a;U_BL7\r\u0006\u0003\u0002\u0016\u0005=\u0002\u0003BA\f\u0003SqA!!\u0007\u0002&9!\u00111DA\u0012\u001d\u0011\ti\"!\t\u000f\u0007u\ny\"C\u0001!\u0013\tqr$\u0003\u0002\u001d;%\u0019\u0011qE\u000e\u0002\u001f\u00153XM\u001c;j]\u001e$Vm\u001d;LSRLA!a\u000b\u0002.\t)Ak\u001c9jG*\u0019\u0011qE\u000e\t\r\u0005E\u0012\u00031\u0001;\u0003\u0015!x\u000e]5d\u000319W\r\u001e+pa&\u001c\u0017*\u001c9m)\u0011\tI!a\u000e\t\r\u0005E\"\u00031\u0001;\u0005-\u0019VM\u001d<jG\u0016LU\u000e\u001d7\u0014\u000bM\ti$a\u0011\u0011\u0007-\u000by$C\u0002\u0002B\u0001\u0013a!\u00118z%\u00164\u0007\u0003BA#\u0003#j!!a\u0012\u000b\t\u0005%\u00131J\u0001\u0016KZ,g\u000e^5oO~#Xm\u001d;`E\u0006\u001c7.\u001a8e\u0015\u0011\ti%a\u0014\u0002\u0011A\u0014x\u000e^8d_2T!\u0001H\u0010\n\t\u0005M\u0013q\t\u0002\u0017\u000bZ,g\u000e^5oOR+7\u000f^&jiN+'O^5dKR\u0011\u0011q\u000b\t\u0004\u00033\u001aR\"\u0001\u0001\u0002\u0015\u0015l\u0017\u000e^*j]\u001edW\r\u0006\u0003\u0002`\u0005-\u0004#B;\u0002b\u0005\u0015\u0014bAA2m\n1a)\u001e;ve\u0016\u0004B!!\u0012\u0002h%!\u0011\u0011NA$\u0005A)U.\u001b;TS:<G.\u001a*fgVdG\u000fC\u0004\u0002nU\u0001\r!a\u001c\u0002\u0005%t\u0007\u0003BA#\u0003cJA!a\u001d\u0002H\t\tR)\\5u'&tw\r\\3D_6l\u0017M\u001c3\u0002\u0013I,hnU8ve\u000e,G\u0003BA=\u0003/\u0003\u0002\"a\u001f\u0002\u0006\u0006%\u0015qR\u0007\u0003\u0003{RA!a \u0002\u0002\u0006A1oY1mC\u0012\u001cHNC\u0002\u0002\u0004R\naa\u001d;sK\u0006l\u0017\u0002BAD\u0003{\u0012aaU8ve\u000e,\u0007\u0003BA#\u0003\u0017KA!!$\u0002H\tQ1k\\;sG\u0016,E.Z7\u0011\t\u0005E\u00151S\u0007\u0002i%\u0019\u0011Q\u0013\u001b\u0003\u000f9{G/V:fI\"9\u0011Q\u000e\fA\u0002\u0005e\u0005\u0003CA>\u0003\u000b\u000bY*a$\u0011\t\u0005\u0015\u0013QT\u0005\u0005\u0003?\u000b9E\u0001\tSk:\u001cv.\u001e:dK\u000e{W.\\1oI\u0006qQM^3oiN#(/Z1n\u001fV$H\u0003BAS\u0003[\u0003\u0002\"a\u001f\u0002\u0006\u0006\u001d\u0016q\u0012\t\u0005\u0003\u000b\nI+\u0003\u0003\u0002,\u0006\u001d#\u0001F#wK:$8\u000b\u001e:fC6|U\u000f\u001e*fgVdG\u000fC\u0004\u0002n]\u0001\r!a,\u0011\u0011\u0005m\u0014QQAY\u0003\u001f\u0003B!!\u0012\u00024&!\u0011QWA$\u0005U)e/\u001a8u'R\u0014X-Y7PkR\u001cu.\\7b]\u0012\u0004")
/* loaded from: input_file:kalix/javasdk/testkit/impl/EventingTestServiceImpl.class */
public final class EventingTestServiceImpl implements EventingTestKit {
    private final ActorSystem system;
    private final String host;
    private int port;
    private final MessageCodec codec;
    private final ActorSystem kalix$javasdk$testkit$impl$EventingTestServiceImpl$$sys;
    private final Logger kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log = LoggerFactory.getLogger(EventingTestServiceImpl.class);
    private final ExecutionContextExecutor kalix$javasdk$testkit$impl$EventingTestServiceImpl$$ec = kalix$javasdk$testkit$impl$EventingTestServiceImpl$$sys().dispatcher();
    private final ConcurrentHashMap<String, TopicImpl> topics = new ConcurrentHashMap<>();

    /* compiled from: EventingTestKitImpl.scala */
    /* loaded from: input_file:kalix/javasdk/testkit/impl/EventingTestServiceImpl$ServiceImpl.class */
    public final class ServiceImpl implements EventingTestKitService {
        private final /* synthetic */ EventingTestServiceImpl $outer;

        @Override // kalix.testkit.protocol.eventing_test_backend.EventingTestKitService
        public Future<EmitSingleResult> emitSingle(EmitSingleCommand emitSingleCommand) {
            this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log().debug("Receiving message from test broker: [{}]", emitSingleCommand);
            emitSingleCommand.destination().foreach(eventDestination -> {
                $anonfun$emitSingle$1(this, emitSingleCommand, eventDestination);
                return BoxedUnit.UNIT;
            });
            if (emitSingleCommand.destination().isEmpty()) {
                this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log().warn("Received a message without destination, ignoring. {}", emitSingleCommand);
            }
            return Future$.MODULE$.successful(new EmitSingleResult(EmitSingleResult$.MODULE$.apply$default$1()));
        }

        @Override // kalix.testkit.protocol.eventing_test_backend.EventingTestKitService
        public Source<SourceElem, NotUsed> runSource(Source<RunSourceCommand, NotUsed> source) {
            this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log().debug("Reading topic from test broker - runSource request started: {}", source);
            Promise apply = Promise$.MODULE$.apply();
            source.watchTermination((notUsed, future) -> {
                $anonfun$runSource$1(this, notUsed, future);
                return BoxedUnit.UNIT;
            }).runWith(Sink$.MODULE$.fold(None$.MODULE$, (option, runSourceCommand) -> {
                RunSourceCreate m1128value;
                Tuple2 tuple2 = new Tuple2(option, runSourceCommand);
                if (tuple2 != null) {
                    Option option = (Option) tuple2._1();
                    RunSourceCommand runSourceCommand = (RunSourceCommand) tuple2._2();
                    if (None$.MODULE$.equals(option) && runSourceCommand != null) {
                        RunSourceCommand.Command command = runSourceCommand.command();
                        if ((command instanceof RunSourceCommand.Command.Create) && (m1128value = ((RunSourceCommand.Command.Create) command).m1128value()) != null) {
                            String serviceName = m1128value.serviceName();
                            Some source2 = m1128value.source();
                            if (source2 instanceof Some) {
                                EventSource eventSource = (EventSource) source2.value();
                                this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log().debug("runSource request got initial create command for service name [{}], source: [{}]", serviceName, eventSource);
                                Tuple2 preMaterialize = Source$.MODULE$.queue(10).preMaterialize(Materializer$.MODULE$.matFromSystem(this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$sys()));
                                if (preMaterialize == null) {
                                    throw new MatchError(preMaterialize);
                                }
                                Tuple2 tuple22 = new Tuple2((BoundedSourceQueue) preMaterialize._1(), (Source) preMaterialize._2());
                                EventingTestKitImpl.RunningSourceProbe runningSourceProbe = new EventingTestKitImpl.RunningSourceProbe(serviceName, eventSource, (BoundedSourceQueue) tuple22._1(), (Source) tuple22._2());
                                this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getTopicImpl(eventSource.getTopic()).addSourceProbe(runningSourceProbe);
                                apply.success(runningSourceProbe);
                                return new Some(runningSourceProbe);
                            }
                        }
                    }
                }
                if (tuple2 != null) {
                    Some some = (Option) tuple2._1();
                    RunSourceCommand runSourceCommand2 = (RunSourceCommand) tuple2._2();
                    if (some instanceof Some) {
                        Some some2 = some;
                        if (runSourceCommand2 != null) {
                            RunSourceCommand.Command command2 = runSourceCommand2.command();
                            if (command2 instanceof RunSourceCommand.Command.Ack) {
                                this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log().debug("runSource request got ack [{}]", ((RunSourceCommand.Command.Ack) command2).m1127value());
                                return some2;
                            }
                        }
                    }
                }
                throw new MatchError("Unexpected fold input: " + tuple2);
            }), Materializer$.MODULE$.matFromSystem(this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$sys()));
            return Source$.MODULE$.futureSource(apply.future().map(runningSourceProbe -> {
                return runningSourceProbe.outSource();
            }, this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$ec())).watchTermination((future2, future3) -> {
                $anonfun$runSource$5(this, future2, future3);
                return BoxedUnit.UNIT;
            }).mapMaterializedValue(boxedUnit -> {
                return NotUsed$.MODULE$;
            });
        }

        @Override // kalix.testkit.protocol.eventing_test_backend.EventingTestKitService
        public Source<EventStreamOutResult, NotUsed> eventStreamOut(Source<EventStreamOutCommand, NotUsed> source) {
            throw new UnsupportedOperationException("Feature not supported in the testkit yet");
        }

        public static final /* synthetic */ void $anonfun$emitSingle$1(ServiceImpl serviceImpl, EmitSingleCommand emitSingleCommand, EventDestination eventDestination) {
            ActorRef ref = serviceImpl.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getTopicImpl(eventDestination.getTopic()).destinationProbe().ref();
            ref.$bang(emitSingleCommand, ref.$bang$default$2(emitSingleCommand));
        }

        public static final /* synthetic */ void $anonfun$runSource$2(ServiceImpl serviceImpl, Try r5) {
            if (r5 instanceof Success) {
                serviceImpl.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log().debug("runSource in completed");
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                if (!(r5 instanceof Failure)) {
                    throw new MatchError(r5);
                }
                serviceImpl.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log().error("runSource in failed", ((Failure) r5).exception());
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        }

        public static final /* synthetic */ void $anonfun$runSource$1(ServiceImpl serviceImpl, NotUsed notUsed, Future future) {
            future.onComplete(r4 -> {
                $anonfun$runSource$2(serviceImpl, r4);
                return BoxedUnit.UNIT;
            }, serviceImpl.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$ec());
        }

        public static final /* synthetic */ void $anonfun$runSource$6(ServiceImpl serviceImpl, Try r5) {
            if (r5 instanceof Success) {
                serviceImpl.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log().debug("runSource out completed");
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                if (!(r5 instanceof Failure)) {
                    throw new MatchError(r5);
                }
                serviceImpl.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log().error("runSource out failed", ((Failure) r5).exception());
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        }

        public static final /* synthetic */ void $anonfun$runSource$5(ServiceImpl serviceImpl, Future future, Future future2) {
            future2.onComplete(r4 -> {
                $anonfun$runSource$6(serviceImpl, r4);
                return BoxedUnit.UNIT;
            }, serviceImpl.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$ec());
        }

        public ServiceImpl(EventingTestServiceImpl eventingTestServiceImpl) {
            if (eventingTestServiceImpl == null) {
                throw null;
            }
            this.$outer = eventingTestServiceImpl;
        }
    }

    public String host() {
        return this.host;
    }

    public int port() {
        return this.port;
    }

    public void port_$eq(int i) {
        this.port = i;
    }

    public Logger kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log() {
        return this.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log;
    }

    public ActorSystem kalix$javasdk$testkit$impl$EventingTestServiceImpl$$sys() {
        return this.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$sys;
    }

    public ExecutionContextExecutor kalix$javasdk$testkit$impl$EventingTestServiceImpl$$ec() {
        return this.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$ec;
    }

    private ConcurrentHashMap<String, TopicImpl> topics() {
        return this.topics;
    }

    @Override // kalix.javasdk.testkit.EventingTestKit
    public EventingTestKit.Topic getTopic(String str) {
        return kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getTopicImpl(str);
    }

    public TopicImpl kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getTopicImpl(String str) {
        return topics().computeIfAbsent(str, str2 -> {
            return new TopicImpl(TestProbe$.MODULE$.apply(this.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$sys()), this.system.actorOf(Props$.MODULE$.apply(ClassTag$.MODULE$.apply(SourcesHolder.class)), "topic-source-holder-" + str), this.codec);
        });
    }

    public EventingTestServiceImpl(ActorSystem actorSystem, String str, int i, MessageCodec messageCodec) {
        this.system = actorSystem;
        this.host = str;
        this.port = i;
        this.codec = messageCodec;
        this.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$sys = actorSystem;
    }
}
