package pl.touk.nussknacker.engine.kafka;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.kafka.clients.producer.KafkaProducer;
import pl.touk.nussknacker.engine.api.MetaData;
import pl.touk.nussknacker.engine.api.exception.EspExceptionInfo;
import pl.touk.nussknacker.engine.api.exception.NonTransientException;
import pl.touk.nussknacker.engine.flink.api.RuntimeContextLifecycle;
import pl.touk.nussknacker.engine.flink.api.exception.FlinkEspExceptionConsumer;
import scala.Array$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;

/* compiled from: KafkaExceptionConsumer.scala */
@ScalaSignature(bytes = "\u0006\u0001a4AAC\u0006\u0001-!Aq\u0005\u0001BC\u0002\u0013\u0005\u0001\u0006\u0003\u0005.\u0001\t\u0005\t\u0015!\u0003*\u0011!q\u0003A!A!\u0002\u0013y\u0003\u0002\u0003\u001e\u0001\u0005\u0003\u0005\u000b\u0011B\u001e\t\u0011I\u0003!\u0011!Q\u0001\fMCQa\u0016\u0001\u0005\u0002aC\u0001b\u0018\u0001\t\u0006\u0004%I\u0001\u0019\u0005\u0006_\u0002!\t\u0001\u001d\u0005\u0006m\u0002!\te\u001e\u0002\u0017\u0017\u000647.Y#yG\u0016\u0004H/[8o\u0007>t7/^7fe*\u0011A\"D\u0001\u0006W\u000647.\u0019\u0006\u0003\u001d=\ta!\u001a8hS:,'B\u0001\t\u0012\u0003-qWo]:l]\u0006\u001c7.\u001a:\u000b\u0005I\u0019\u0012\u0001\u0002;pk.T\u0011\u0001F\u0001\u0003a2\u001c\u0001aE\u0002\u0001/u\u0001\"\u0001G\u000e\u000e\u0003eQ\u0011AG\u0001\u0006g\u000e\fG.Y\u0005\u00039e\u0011a!\u00118z%\u00164\u0007C\u0001\u0010&\u001b\u0005y\"B\u0001\u0011\"\u0003%)\u0007pY3qi&|gN\u0003\u0002#G\u0005\u0019\u0011\r]5\u000b\u0005\u0011j\u0011!\u00024mS:\\\u0017B\u0001\u0014 \u0005e1E.\u001b8l\u000bN\u0004X\t_2faRLwN\\\"p]N,X.\u001a:\u0002\u0017-\fgm[1D_:4\u0017nZ\u000b\u0002SA\u0011!fK\u0007\u0002\u0017%\u0011Af\u0003\u0002\f\u0017\u000647.Y\"p]\u001aLw-\u0001\u0007lC\u001a\\\u0017mQ8oM&<\u0007%A\u0003u_BL7\r\u0005\u00021o9\u0011\u0011'\u000e\t\u0003eei\u0011a\r\u0006\u0003iU\ta\u0001\u0010:p_Rt\u0014B\u0001\u001c\u001a\u0003\u0019\u0001&/\u001a3fM&\u0011\u0001(\u000f\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005YJ\u0012aE:fe&\fG.\u001b>bi&|gnU2iK6\f\u0007c\u0001\u001fH\u00136\tQH\u0003\u0002?\u007f\u0005i1/\u001a:jC2L'0\u0019;j_:T!\u0001Q!\u0002\r\r|W.\\8o\u0015\t\u0011#I\u0003\u0002%\u0007*\u0011A)R\u0001\u0007CB\f7\r[3\u000b\u0003\u0019\u000b1a\u001c:h\u0013\tAUHA\nTKJL\u0017\r\\5{CRLwN\\*dQ\u0016l\u0017\rE\u0002K\u001b>k\u0011a\u0013\u0006\u0003A1S!AI\u0007\n\u00059[%\u0001E#ta\u0016C8-\u001a9uS>t\u0017J\u001c4p!\tQ\u0005+\u0003\u0002R\u0017\n)bj\u001c8Ue\u0006t7/[3oi\u0016C8-\u001a9uS>t\u0017\u0001C7fi\u0006$\u0015\r^1\u0011\u0005Q+V\"\u0001'\n\u0005Yc%\u0001C'fi\u0006$\u0015\r^1\u0002\rqJg.\u001b;?)\u0011IF,\u00180\u0015\u0005i[\u0006C\u0001\u0016\u0001\u0011\u0015\u0011f\u0001q\u0001T\u0011\u00159c\u00011\u0001*\u0011\u0015qc\u00011\u00010\u0011\u0015Qd\u00011\u0001<\u0003!\u0001(o\u001c3vG\u0016\u0014X#A1\u0011\t\t<\u0017.[\u0007\u0002G*\u0011q\f\u001a\u0006\u0003K\u001a\fqa\u00197jK:$8O\u0003\u0002\r\u0007&\u0011\u0001n\u0019\u0002\u000e\u0017\u000647.\u0019)s_\u0012,8-\u001a:\u0011\u0007aQG.\u0003\u0002l3\t)\u0011I\u001d:bsB\u0011\u0001$\\\u0005\u0003]f\u0011AAQ=uK\u000691m\u001c8tk6,GCA9u!\tA\"/\u0003\u0002t3\t!QK\\5u\u0011\u0015)\b\u00021\u0001J\u00035)\u0007pY3qi&|g.\u00138g_\u0006)1\r\\8tKR\t\u0011\u000f")
/* loaded from: input_file:pl/touk/nussknacker/engine/kafka/KafkaExceptionConsumer.class */
public class KafkaExceptionConsumer implements FlinkEspExceptionConsumer {
    private KafkaProducer<byte[], byte[]> producer;
    private final KafkaConfig kafkaConfig;
    private final String topic;
    private final SerializationSchema<EspExceptionInfo<NonTransientException>> serializationSchema;
    private final MetaData metaData;
    private volatile boolean bitmap$0;

    public void open(RuntimeContext runtimeContext) {
        RuntimeContextLifecycle.open$(this, runtimeContext);
    }

    public KafkaConfig kafkaConfig() {
        return this.kafkaConfig;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v9, types: [pl.touk.nussknacker.engine.kafka.KafkaExceptionConsumer] */
    private KafkaProducer<byte[], byte[]> producer$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.producer = KafkaEspUtils$.MODULE$.createProducer(kafkaConfig(), new StringBuilder(10).append("exception-").append(this.metaData.id()).toString());
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        this.metaData = null;
        return this.producer;
    }

    private KafkaProducer<byte[], byte[]> producer() {
        return !this.bitmap$0 ? producer$lzycompute() : this.producer;
    }

    public void consume(EspExceptionInfo<NonTransientException> espExceptionInfo) {
        KafkaEspUtils$.MODULE$.sendToKafka(this.topic, Array$.MODULE$.empty(ClassTag$.MODULE$.Byte()), this.serializationSchema.serialize(espExceptionInfo), producer());
    }

    public void close() {
        producer().close();
    }

    public KafkaExceptionConsumer(KafkaConfig kafkaConfig, String str, SerializationSchema<EspExceptionInfo<NonTransientException>> serializationSchema, MetaData metaData) {
        this.kafkaConfig = kafkaConfig;
        this.topic = str;
        this.serializationSchema = serializationSchema;
        this.metaData = metaData;
        RuntimeContextLifecycle.$init$(this);
        FlinkEspExceptionConsumer.$init$(this);
    }
}
