/*
 * Decompiled with CFR 0.152.
 */
package nl.tradecloud.kafka;

import akka.actor.Actor;
import akka.actor.ActorLogging;
import akka.actor.ActorRefFactory;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerSettings;
import akka.kafka.ConsumerSettings$;
import akka.kafka.Subscription;
import akka.kafka.Subscriptions$;
import akka.kafka.scaladsl.Consumer$;
import akka.protobuf.InvalidProtocolBufferException;
import akka.remote.WireFormats;
import akka.stream.ActorMaterializer$;
import akka.stream.ActorMaterializerSettings$;
import akka.stream.Materializer;
import akka.stream.Supervision;
import akka.stream.scaladsl.Source;
import java.io.NotSerializableException;
import nl.tradecloud.kafka.KafkaConsumer;
import nl.tradecloud.kafka.KafkaMessageSerializer$;
import nl.tradecloud.kafka.command.Subscribe;
import nl.tradecloud.kafka.config.KafkaConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.collection.mutable.StringBuilder;

public abstract class KafkaConsumer$class {
    public static Source initConsumer(KafkaConsumer $this, KafkaConfig config, Subscribe subscribe) {
        Set prefixedTopics = (Set)subscribe.topics().map((Function1)new Serializable($this, config){
            public static final long serialVersionUID = 0L;
            private final KafkaConfig config$1;

            public final String apply(String x$1) {
                return new StringBuilder().append((Object)this.config$1.topicPrefix()).append((Object)x$1).toString();
            }
            {
                this.config$1 = config$1;
            }
        }, Set$.MODULE$.canBuildFrom());
        ((ActorLogging)$this).log().info("Start KafkaConsumer, with group={}, topics={}, prefixedTopics={}", (Object)subscribe.group(), (Object)subscribe.topics().mkString(", "), (Object)prefixedTopics.mkString(", "));
        ConsumerSettings consumerSettings = ConsumerSettings$.MODULE$.apply(((Actor)$this).context().system(), (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()).withBootstrapServers(config.bootstrapServers()).withGroupId(subscribe.group()).withProperty("auto.offset.reset", "earliest");
        return (Source)Consumer$.MODULE$.committableSource(consumerSettings, (Subscription)Subscriptions$.MODULE$.topics(prefixedTopics)).map((Function1)new Serializable($this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ KafkaConsumer $outer;

            public final KafkaConsumer.KafkaMessage apply(ConsumerMessage.CommittableMessage<byte[], byte[]> message) {
                ((ActorLogging)this.$outer).log().debug("Received message value={}, key={}", message.record().value(), message.record().key());
                WireFormats.SerializedMessage serializedMessage = WireFormats.SerializedMessage.parseFrom((byte[])((byte[])message.record().value()));
                if (serializedMessage != null) {
                    WireFormats.SerializedMessage serializedMessage2 = serializedMessage;
                    KafkaConsumer.KafkaMessage kafkaMessage = new KafkaConsumer.KafkaMessage(message, KafkaMessageSerializer$.MODULE$.deserialize(((Actor)this.$outer).context().system(), serializedMessage2));
                    return kafkaMessage;
                }
                throw new NotSerializableException(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Unable to deserialize msg ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{message.record().value()})));
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
    }

    public static void $init$(KafkaConsumer $this) {
        $this.nl$tradecloud$kafka$KafkaConsumer$_setter_$nl$tradecloud$kafka$KafkaConsumer$$decider_$eq((Function1)new Serializable($this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ KafkaConsumer $outer;

            public final Supervision.Directive apply(Throwable x0$1) {
                Supervision.Resume$ resume$;
                Throwable throwable = x0$1;
                if (throwable instanceof NotSerializableException) {
                    NotSerializableException notSerializableException = (NotSerializableException)throwable;
                    ((ActorLogging)this.$outer).log().error((Throwable)notSerializableException, "Message is not deserializable, resuming...");
                    resume$ = Supervision.Resume$.MODULE$;
                } else if (throwable instanceof InvalidProtocolBufferException) {
                    InvalidProtocolBufferException invalidProtocolBufferException = (InvalidProtocolBufferException)throwable;
                    ((ActorLogging)this.$outer).log().error((Throwable)invalidProtocolBufferException, "Message is not deserializable, resuming...");
                    resume$ = Supervision.Resume$.MODULE$;
                } else if (throwable != null) {
                    Throwable throwable2 = throwable;
                    ((ActorLogging)this.$outer).log().error(throwable2, "Exception occurred, stopping...");
                    resume$ = Supervision.Stop$.MODULE$;
                } else {
                    ((ActorLogging)this.$outer).log().error("Unknown problem, stopping...");
                    resume$ = Supervision.Stop$.MODULE$;
                }
                return resume$;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        $this.nl$tradecloud$kafka$KafkaConsumer$_setter_$materializer_$eq((Materializer)ActorMaterializer$.MODULE$.apply(ActorMaterializerSettings$.MODULE$.apply(((Actor)$this).context().system()).withSupervisionStrategy($this.nl$tradecloud$kafka$KafkaConsumer$$decider()), (ActorRefFactory)((Actor)$this).context()));
    }
}

