/*
 * Decompiled with CFR 0.152.
 */
package nl.tradecloud.kafka;

import akka.actor.Actor;
import akka.actor.ActorLogging;
import akka.actor.ActorRefFactory;
import akka.kafka.ProducerMessage;
import akka.kafka.ProducerSettings;
import akka.kafka.ProducerSettings$;
import akka.kafka.scaladsl.Producer$;
import akka.stream.ActorMaterializer$;
import akka.stream.ActorMaterializerSettings$;
import akka.stream.Graph;
import akka.stream.Materializer;
import akka.stream.Supervision;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import java.io.NotSerializableException;
import nl.tradecloud.kafka.KafkaMessageSerializer$;
import nl.tradecloud.kafka.KafkaPublisher;
import nl.tradecloud.kafka.KafkaPublisherSource$;
import nl.tradecloud.kafka.command.Publish;
import nl.tradecloud.kafka.config.KafkaConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import scala.Function1;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.mutable.StringBuilder;

public abstract class KafkaPublisher$class {
    public static Tuple2 produce(KafkaPublisher $this, KafkaConfig config, String topic) {
        String prefixedTopic = new StringBuilder().append((Object)config.topicPrefix()).append((Object)topic).toString();
        ((ActorLogging)$this).log().info("Started publisher for topic={}, prefixedTopic={}", (Object)topic, (Object)prefixedTopic);
        ProducerSettings producerSettings = ProducerSettings$.MODULE$.apply(((Actor)$this).context().system(), (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer()).withBootstrapServers(config.bootstrapServers());
        Source publisherSource = Source$.MODULE$.actorPublisher(KafkaPublisherSource$.MODULE$.props());
        return ((Flow)Flow$.MODULE$.apply().map((Function1)new Serializable($this, prefixedTopic, topic){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ KafkaPublisher $outer;
            private final String prefixedTopic$1;
            private final String topic$1;

            public final byte[] apply(Publish cmd) {
                ((ActorLogging)this.$outer).log().debug("Publishing cmd={}, topic={}, prefixedTopic={}", (Object)cmd, (Object)this.topic$1, (Object)this.prefixedTopic$1);
                return KafkaMessageSerializer$.MODULE$.serialize(((Actor)this.$outer).context().system(), cmd.msg()).toByteArray();
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.prefixedTopic$1 = prefixedTopic$1;
                this.topic$1 = topic$1;
            }
        }).map((Function1)new Serializable($this, prefixedTopic, topic){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ KafkaPublisher $outer;
            private final String prefixedTopic$1;
            private final String topic$1;

            public final ProducerMessage.Message<byte[], byte[], byte[]> apply(byte[] msg) {
                ((ActorLogging)this.$outer).log().debug("Publishing serialized={}, topic={}, prefixedTopic={}", (Object)msg.toString(), (Object)this.topic$1, (Object)this.prefixedTopic$1);
                return new ProducerMessage.Message(new ProducerRecord(this.prefixedTopic$1, (Object)msg), (Object)msg);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.prefixedTopic$1 = prefixedTopic$1;
                this.topic$1 = topic$1;
            }
        })).via((Graph)Producer$.MODULE$.flow(producerSettings)).runWith((Graph)publisherSource, (Graph)Sink$.MODULE$.ignore(), $this.materializer());
    }

    public static void $init$(KafkaPublisher $this) {
        $this.nl$tradecloud$kafka$KafkaPublisher$_setter_$nl$tradecloud$kafka$KafkaPublisher$$decider_$eq((Function1)new Serializable($this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ KafkaPublisher $outer;

            public final Supervision.Directive apply(Throwable x0$1) {
                Supervision.Stop$ stop$;
                Throwable throwable = x0$1;
                if (throwable instanceof NotSerializableException) {
                    NotSerializableException notSerializableException = (NotSerializableException)throwable;
                    ((ActorLogging)this.$outer).log().error((Throwable)notSerializableException, "Message is not serializable, resuming...");
                    stop$ = Supervision.Resume$.MODULE$;
                } else if (throwable != null) {
                    Throwable throwable2 = throwable;
                    ((ActorLogging)this.$outer).log().error(throwable2, "Exception occurred");
                    stop$ = Supervision.Stop$.MODULE$;
                } else {
                    ((ActorLogging)this.$outer).log().error("Unknown problem");
                    stop$ = Supervision.Stop$.MODULE$;
                }
                return stop$;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        $this.nl$tradecloud$kafka$KafkaPublisher$_setter_$materializer_$eq((Materializer)ActorMaterializer$.MODULE$.apply(ActorMaterializerSettings$.MODULE$.apply(((Actor)$this).context().system()).withSupervisionStrategy($this.nl$tradecloud$kafka$KafkaPublisher$$decider()), (ActorRefFactory)((Actor)$this).context()));
    }
}

