package pl.touk.nussknacker.engine.lite.kafka;

import cats.Invariant$;
import cats.UnorderedFoldable$;
import cats.data.WriterT;
import cats.implicits$;
import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import pl.touk.nussknacker.engine.api.MetaData;
import pl.touk.nussknacker.engine.api.exception.NuExceptionInfo;
import pl.touk.nussknacker.engine.api.exception.WithExceptionExtractor;
import pl.touk.nussknacker.engine.api.process.Source;
import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext;
import pl.touk.nussknacker.engine.kafka.KafkaUtils$;
import pl.touk.nussknacker.engine.kafka.exception.KafkaErrorTopicInitializer;
import pl.touk.nussknacker.engine.kafka.exception.KafkaJsonExceptionSerializationSchema;
import pl.touk.nussknacker.engine.lite.api.interpreterTypes;
import pl.touk.nussknacker.engine.lite.kafka.KafkaTransactionalScenarioInterpreter;
import pl.touk.nussknacker.engine.lite.kafka.api.LiteKafkaSource;
import pl.touk.nussknacker.engine.lite.metrics.SourceMetrics;
import pl.touk.nussknacker.engine.util.exception.DefaultWithExceptionExtractor;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.compat.java8.DurationConverters$;
import scala.compat.java8.DurationConverters$FiniteDurationops$;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.jdk.CollectionConverters$;
import scala.math.Ordering$Long$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

/* compiled from: KafkaSingleScenarioTaskRun.scala */
@ScalaSignature(bytes = "\u0006\u0001\t]f\u0001\u0002\u0015*\u0001YB\u0001\"\u0014\u0001\u0003\u0002\u0003\u0006IA\u0014\u0005\t7\u0002\u0011\t\u0011)A\u00059\"A!\r\u0001B\u0001B\u0003%1\r\u0003\u0005j\u0001\t\u0005\t\u0015!\u0003k\u0011!i\bA!A!\u0002\u0013q\bBCA\u0012\u0001\t\u0005\t\u0015!\u0003\u0002&!Q\u0011\u0011\u0007\u0001\u0003\u0002\u0003\u0006Y!a\r\t\u000f\u0005e\u0002\u0001\"\u0001\u0002<!I\u0011q\n\u0001C\u0002\u0013%\u0011\u0011\u000b\u0005\b\u0003'\u0002\u0001\u0015!\u0003O\u0011-\t)\u0006\u0001a\u0001\u0002\u0004%I!a\u0016\t\u0017\u0005}\u0004\u00011AA\u0002\u0013%\u0011\u0011\u0011\u0005\f\u0003\u001b\u0003\u0001\u0019!A!B\u0013\tI\u0006C\u0006\u0002\u0010\u0002\u0001\r\u00111A\u0005\n\u0005E\u0005bCAO\u0001\u0001\u0007\t\u0019!C\u0005\u0003?C1\"a)\u0001\u0001\u0004\u0005\t\u0015)\u0003\u0002\u0014\"Y\u0011Q\u0015\u0001A\u0002\u0003\u0007I\u0011BAT\u0011-\ty\u000b\u0001a\u0001\u0002\u0004%I!!-\t\u0017\u0005U\u0006\u00011A\u0001B\u0003&\u0011\u0011\u0016\u0005\f\u0003o\u0003\u0001\u0019!a\u0001\n\u0013\t9\u000bC\u0006\u0002:\u0002\u0001\r\u00111A\u0005\n\u0005m\u0006bCA`\u0001\u0001\u0007\t\u0011)Q\u0005\u0003SC\u0011\"!1\u0001\u0005\u0004%\t\"a1\t\u0011\u0005E\u0007\u0001)A\u0005\u0003\u000bD\u0011\"a5\u0001\u0005\u0004%I!!6\t\u0011\u0005u\b\u0001)A\u0005\u0003/Dq!a@\u0001\t\u0003\u0011\t\u0001C\u0004\u0003\u0004\u0001!I!a\u0016\t\u000f\t\u0015\u0001\u0001\"\u0003\u0002\u0012\"9!q\u0001\u0001\u0005\n\t\u0005\u0001b\u0002B\u0005\u0001\u0011\u0005!\u0011\u0001\u0005\b\u0005\u0017\u0001A\u0011\u0002B\u0007\u0011\u001d\u0011y\u0002\u0001C\u0005\u0005CAqAa\u0011\u0001\t\u0013\u0011)\u0005C\u0004\u0003~\u0001!IAa \t\u000f\t-\u0005\u0001\"\u0003\u0003\u000e\"9!Q\u0015\u0001\u0005\u0002\t\u0005\u0001b\u0002BT\u0001\u0011%!\u0011\u0001\u0005\b\u0005S\u0003A\u0011\u0002BV\u0005iY\u0015MZ6b'&tw\r\\3TG\u0016t\u0017M]5p)\u0006\u001c8NU;o\u0015\tQ3&A\u0003lC\u001a\\\u0017M\u0003\u0002-[\u0005!A.\u001b;f\u0015\tqs&\u0001\u0004f]\u001eLg.\u001a\u0006\u0003aE\n1B\\;tg.t\u0017mY6fe*\u0011!gM\u0001\u0005i>,8NC\u00015\u0003\t\u0001Hn\u0001\u0001\u0014\t\u00019th\u0011\t\u0003quj\u0011!\u000f\u0006\u0003um\nA\u0001\\1oO*\tA(\u0001\u0003kCZ\f\u0017B\u0001 :\u0005\u0019y%M[3diB\u0011\u0001)Q\u0007\u0002S%\u0011!)\u000b\u0002\u0005)\u0006\u001c8\u000e\u0005\u0002E\u00176\tQI\u0003\u0002G\u000f\u0006a1oY1mC2|wmZ5oO*\u0011\u0001*S\u0001\tif\u0004Xm]1gK*\t!*A\u0002d_6L!\u0001T#\u0003\u00171\u000b'0\u001f'pO\u001eLgnZ\u0001\u0007i\u0006\u001c8.\u00133\u0011\u0005=CfB\u0001)W!\t\tF+D\u0001S\u0015\t\u0019V'\u0001\u0004=e>|GO\u0010\u0006\u0002+\u0006)1oY1mC&\u0011q\u000bV\u0001\u0007!J,G-\u001a4\n\u0005eS&AB*ue&twM\u0003\u0002X)\u0006AQ.\u001a;b\t\u0006$\u0018\r\u0005\u0002^A6\taL\u0003\u0002`[\u0005\u0019\u0011\r]5\n\u0005\u0005t&\u0001C'fi\u0006$\u0015\r^1\u0002\u001dI,h\u000e^5nK\u000e{g\u000e^3yiB\u0011AmZ\u0007\u0002K*\u0011aMX\u0001\u000feVtG/[7fG>tG/\u001a=u\u0013\tAWM\u0001\u000bF]\u001eLg.\u001a*v]RLW.Z\"p]R,\u0007\u0010^\u0001\rK:<\u0017N\\3D_:4\u0017n\u001a\t\u0003Wjt!\u0001\u001c=\u000f\u00055<hB\u00018w\u001d\tyWO\u0004\u0002qi:\u0011\u0011o\u001d\b\u0003#JL\u0011\u0001N\u0005\u0003eMJ!\u0001M\u0019\n\u00059z\u0013B\u0001\u0017.\u0013\tQ3&\u0003\u0002zS\u0005)3*\u00194lCR\u0013\u0018M\\:bGRLwN\\1m'\u000e,g.\u0019:j_&sG/\u001a:qe\u0016$XM]\u0005\u0003wr\u0014acS1gW\u0006Le\u000e^3saJ,G/\u001a:D_:4\u0017n\u001a\u0006\u0003s&\n1\"\u001b8uKJ\u0004(/\u001a;feBIq0!\u0002\u0002\f\u0005]\u0011Q\u0004\b\u0004[\u0006\u0005\u0011bAA\u0002W\u0005Q2kY3oCJLw.\u00138uKJ\u0004(/\u001a;fe\u001a\u000b7\r^8ss&!\u0011qAA\u0005\u0005\u0001\u001a6-\u001a8be&|\u0017J\u001c;feB\u0014X\r^3s/&$\b\u000eT5gK\u000eL8\r\\3\u000b\u0007\u0005\r1\u0006\u0005\u0003\u0002\u000e\u0005MQBAA\b\u0015\r\t\t\u0002V\u0001\u000bG>t7-\u001e:sK:$\u0018\u0002BA\u000b\u0003\u001f\u0011aAR;ukJ,\u0007cA6\u0002\u001a%\u0019\u00111\u0004?\u0003\u000b%s\u0007/\u001e;\u0011\u0007-\fy\"C\u0002\u0002\"q\u0014aaT;uaV$\u0018!D:pkJ\u001cW-T3ue&\u001c7\u000f\u0005\u0003\u0002(\u00055RBAA\u0015\u0015\r\tYcK\u0001\b[\u0016$(/[2t\u0013\u0011\ty#!\u000b\u0003\u001bM{WO]2f\u001b\u0016$(/[2t\u0003\t)7\r\u0005\u0003\u0002\u000e\u0005U\u0012\u0002BA\u001c\u0003\u001f\u0011\u0001#\u0012=fGV$\u0018n\u001c8D_:$X\r\u001f;\u0002\rqJg.\u001b;?)9\ti$a\u0011\u0002F\u0005\u001d\u0013\u0011JA&\u0003\u001b\"B!a\u0010\u0002BA\u0011\u0001\t\u0001\u0005\b\u0003cA\u00019AA\u001a\u0011\u0015i\u0005\u00021\u0001O\u0011\u0015Y\u0006\u00021\u0001]\u0011\u0015\u0011\u0007\u00021\u0001d\u0011\u0015I\u0007\u00021\u0001k\u0011\u0015i\b\u00021\u0001\u007f\u0011\u001d\t\u0019\u0003\u0003a\u0001\u0003K\tqa\u001a:pkBLE-F\u0001O\u0003!9'o\\;q\u0013\u0012\u0004\u0013\u0001C2p]N,X.\u001a:\u0016\u0005\u0005e\u0003\u0003CA.\u0003[\n\t(!\u001d\u000e\u0005\u0005u#\u0002BA+\u0003?RA!!\u0019\u0002d\u000591\r\\5f]R\u001c(b\u0001\u0016\u0002f)!\u0011qMA5\u0003\u0019\t\u0007/Y2iK*\u0011\u00111N\u0001\u0004_J<\u0017\u0002BA8\u0003;\u0012QbS1gW\u0006\u001cuN\\:v[\u0016\u0014\bCBA:\u0003k\nI(D\u0001U\u0013\r\t9\b\u0016\u0002\u0006\u0003J\u0014\u0018-\u001f\t\u0005\u0003g\nY(C\u0002\u0002~Q\u0013AAQ=uK\u0006a1m\u001c8tk6,'o\u0018\u0013fcR!\u00111QAE!\u0011\t\u0019(!\"\n\u0007\u0005\u001dEK\u0001\u0003V]&$\b\"CAF\u0019\u0005\u0005\t\u0019AA-\u0003\rAH%M\u0001\nG>t7/^7fe\u0002\n\u0001\u0002\u001d:pIV\u001cWM]\u000b\u0003\u0003'\u0003\u0002\"!&\u0002\u001a\u0006E\u0014\u0011O\u0007\u0003\u0003/SA!a$\u0002`%!\u00111TAL\u00055Y\u0015MZ6b!J|G-^2fe\u0006a\u0001O]8ek\u000e,'o\u0018\u0013fcR!\u00111QAQ\u0011%\tYiDA\u0001\u0002\u0004\t\u0019*A\u0005qe>$WoY3sA\u0005A2m\u001c8tk6,'/T3ue&\u001c7OU3hSN$(/\u0019:\u0016\u0005\u0005%\u0006c\u0001!\u0002,&\u0019\u0011QV\u0015\u0003+-\u000bgm[1NKR\u0014\u0018nY:SK\u001eL7\u000f\u001e:be\u0006a2m\u001c8tk6,'/T3ue&\u001c7OU3hSN$(/\u0019:`I\u0015\fH\u0003BAB\u0003gC\u0011\"a#\u0013\u0003\u0003\u0005\r!!+\u00023\r|gn];nKJlU\r\u001e:jGN\u0014VmZ5tiJ\f'\u000fI\u0001\u0019aJ|G-^2fe6+GO]5dgJ+w-[:ue\u0006\u0014\u0018\u0001\b9s_\u0012,8-\u001a:NKR\u0014\u0018nY:SK\u001eL7\u000f\u001e:be~#S-\u001d\u000b\u0005\u0003\u0007\u000bi\fC\u0005\u0002\fV\t\t\u00111\u0001\u0002*\u0006I\u0002O]8ek\u000e,'/T3ue&\u001c7OU3hSN$(/\u0019:!\u0003%)\u0007\u0010\u001e:bGR|'/\u0006\u0002\u0002FB!\u0011qYAg\u001b\t\tIMC\u0002\u0002Lz\u000b\u0011\"\u001a=dKB$\u0018n\u001c8\n\t\u0005=\u0017\u0011\u001a\u0002\u0017/&$\b.\u0012=dKB$\u0018n\u001c8FqR\u0014\u0018m\u0019;pe\u0006QQ\r\u001f;sC\u000e$xN\u001d\u0011\u0002\u001bM|WO]2f)>$v\u000e]5d+\t\t9\u000e\u0005\u0004P\u00033t\u0015Q\\\u0005\u0004\u00037T&aA'baB9q*!7\u0002`\u0006M\b\u0003BAq\u0003[tA!a9\u0002h:\u0019Q.!:\n\u0005}[\u0013\u0002BAu\u0003W\f\u0001#\u001b8uKJ\u0004(/\u001a;feRK\b/Z:\u000b\u0005}[\u0013\u0002BAx\u0003c\u0014\u0001bU8ve\u000e,\u0017\n\u001a\u0006\u0005\u0003S\fY\u000f\u0005\u0003\u0002v\u0006eXBAA|\u0015\ty\u0016&\u0003\u0003\u0002|\u0006](a\u0004'ji\u0016\\\u0015MZ6b'>,(oY3\u0002\u001dM|WO]2f)>$v\u000e]5dA\u0005!\u0011N\\5u)\t\t\u0019)A\bqe\u0016\u0004\u0018M]3D_:\u001cX/\\3s\u0003=\u0001(/\u001a9be\u0016\u0004&o\u001c3vG\u0016\u0014\u0018a\u0004:fO&\u001cH/\u001a:NKR\u0014\u0018nY:\u0002\u0007I,h.\u0001\bqe>\u001cWm]:SK\u000e|'\u000fZ:\u0015\t\t=!Q\u0003\t\u0005\u0003g\u0012\t\"C\u0002\u0003\u0014Q\u00131!\u00118z\u0011\u001d\u00119\u0002\ta\u0001\u00053\tqA]3d_J$7\u000f\u0005\u0005\u0002\\\tm\u0011\u0011OA9\u0013\u0011\u0011i\"!\u0018\u0003\u001f\r{gn];nKJ\u0014VmY8sIN\fa\u0002\u001d:fa\u0006\u0014XMU3d_J$7\u000f\u0006\u0003\u0003$\t\u0005\u0003C\u0002B\u0013\u0005_\u0011)D\u0004\u0003\u0003(\t-bbA)\u0003*%\tQ+C\u0002\u0003.Q\u000bq\u0001]1dW\u0006<W-\u0003\u0003\u00032\tM\"\u0001\u0002'jgRT1A!\fU!!\t\u0019Ha\u000e\u0002`\nm\u0012b\u0001B\u001d)\n1A+\u001e9mKJ\u0002\u0002\"a\u0017\u0003>\u0005E\u0014\u0011O\u0005\u0005\u0005\u007f\tiF\u0001\bD_:\u001cX/\\3s%\u0016\u001cwN\u001d3\t\u000f\t]\u0011\u00051\u0001\u0003\u001a\u0005\t2/\u001a8e\u001fV$\b/\u001e;U_.\u000bgm[1\u0015\t\t\u001d#1\f\u0019\u0005\u0005\u0013\u0012y\u0005\u0005\u0004\u0002\u000e\u0005M!1\n\t\u0005\u0005\u001b\u0012y\u0005\u0004\u0001\u0005\u0017\tE#%!A\u0001\u0002\u000b\u0005!1\u000b\u0002\u0004?\u0012\n\u0014\u0003\u0002B+\u0005\u001f\u0001B!a\u001d\u0003X%\u0019!\u0011\f+\u0003\u000f9{G\u000f[5oO\"9!Q\f\u0012A\u0002\t}\u0013AB8viB,H\u000f\u0005\u0004\u0003b\t\u001d$Q\u000e\b\u0005\u0003G\u0014\u0019'\u0003\u0003\u0003f\u0005-\u0018aC2p[6|g\u000eV=qKNLAA!\u001b\u0003l\tQ!+Z:vYR$\u0016\u0010]3\u000b\t\t\u0015\u00141\u001e\t\u0007\u0005_\u0012\u0019Ha\u001e\u000f\t\tE\u0014q]\u0007\u0003\u0003WLAA!\u001e\u0002r\nIQI\u001c3SKN,H\u000e\u001e\t\t\u0003+\u0013I(!\u001d\u0002r%!!1PAL\u00059\u0001&o\u001c3vG\u0016\u0014(+Z2pe\u0012\fab]3sS\u0006d\u0017N_3FeJ|'\u000f\u0006\u0003\u0003x\t\u0005\u0005b\u0002BBG\u0001\u0007!QQ\u0001\u0006KJ\u0014xN\u001d\t\u0005\u0005C\u00129)\u0003\u0003\u0003\n\n-$!C#se>\u0014H+\u001f9f\u0003e\u0011X\r\u001e:jKZ,W*\u0019=PM\u001a\u001cX\r^:PM\u001a\u001cX\r^:\u0015\t\t=%1\u0015\t\b\u001f\u0006e'\u0011\u0013BO!\u0011\u0011\u0019J!'\u000e\u0005\tU%\u0002\u0002BL\u0003G\naaY8n[>t\u0017\u0002\u0002BN\u0005+\u0013a\u0002V8qS\u000e\u0004\u0016M\u001d;ji&|g\u000e\u0005\u0003\u0002\\\t}\u0015\u0002\u0002BQ\u0003;\u0012\u0011c\u00144gg\u0016$\u0018I\u001c3NKR\fG-\u0019;b\u0011\u001d\u00119\u0002\na\u0001\u00053\tQa\u00197pg\u0016\f\u0011cY8oM&<7+\u00198jif\u001c\u0005.Z2l\u0003U\u0011X\r\u001e:z\u00072|7/Z(o\u0013:$XM\u001d:vaR$B!a!\u0003.\"9!qV\u0014A\u0002\tE\u0016AB1di&|g\u000e\u0005\u0004\u0002t\tM\u00161Q\u0005\u0004\u0005k#&!\u0003$v]\u000e$\u0018n\u001c81\u0001")
/* loaded from: input_file:pl/touk/nussknacker/engine/lite/kafka/KafkaSingleScenarioTaskRun.class */
public class KafkaSingleScenarioTaskRun implements Task, LazyLogging {
    private final String taskId;
    private final MetaData metaData;
    private final EngineRuntimeContext runtimeContext;
    private final KafkaTransactionalScenarioInterpreter.KafkaInterpreterConfig engineConfig;
    private final interpreterTypes.ScenarioInterpreter<Future, ConsumerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> interpreter;
    private final SourceMetrics sourceMetrics;
    private final ExecutionContext ec;
    private final String groupId;
    private KafkaConsumer<byte[], byte[]> consumer;
    private KafkaProducer<byte[], byte[]> producer;
    private KafkaMetricsRegistrar consumerMetricsRegistrar;
    private KafkaMetricsRegistrar producerMetricsRegistrar;
    private final WithExceptionExtractor extractor;
    private final Map<String, Map<interpreterTypes.SourceId, LiteKafkaSource>> sourceToTopic;
    private transient Logger logger;
    private volatile transient boolean bitmap$trans$0;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [pl.touk.nussknacker.engine.lite.kafka.KafkaSingleScenarioTaskRun] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                this.logger = LazyLogging.logger$(this);
                r0 = this;
                r0.bitmap$trans$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$trans$0 ? logger$lzycompute() : this.logger;
    }

    private String groupId() {
        return this.groupId;
    }

    private KafkaConsumer<byte[], byte[]> consumer() {
        return this.consumer;
    }

    private void consumer_$eq(KafkaConsumer<byte[], byte[]> kafkaConsumer) {
        this.consumer = kafkaConsumer;
    }

    private KafkaProducer<byte[], byte[]> producer() {
        return this.producer;
    }

    private void producer_$eq(KafkaProducer<byte[], byte[]> kafkaProducer) {
        this.producer = kafkaProducer;
    }

    private KafkaMetricsRegistrar consumerMetricsRegistrar() {
        return this.consumerMetricsRegistrar;
    }

    private void consumerMetricsRegistrar_$eq(KafkaMetricsRegistrar kafkaMetricsRegistrar) {
        this.consumerMetricsRegistrar = kafkaMetricsRegistrar;
    }

    private KafkaMetricsRegistrar producerMetricsRegistrar() {
        return this.producerMetricsRegistrar;
    }

    private void producerMetricsRegistrar_$eq(KafkaMetricsRegistrar kafkaMetricsRegistrar) {
        this.producerMetricsRegistrar = kafkaMetricsRegistrar;
    }

    public WithExceptionExtractor extractor() {
        return this.extractor;
    }

    private Map<String, Map<interpreterTypes.SourceId, LiteKafkaSource>> sourceToTopic() {
        return this.sourceToTopic;
    }

    @Override // pl.touk.nussknacker.engine.lite.kafka.Task
    public void init() {
        configSanityCheck();
        new KafkaErrorTopicInitializer(this.engineConfig.kafka(), this.engineConfig.exceptionHandlingConfig()).init();
        consumer_$eq(prepareConsumer());
        producer_$eq(prepareProducer());
        producer().initTransactions();
        consumer().subscribe(CollectionConverters$.MODULE$.asJavaCollectionConverter(sourceToTopic().keys().toSet()).asJavaCollection());
        registerMetrics();
    }

    private KafkaConsumer<byte[], byte[]> prepareConsumer() {
        Properties transactionalAwareConsumerProperties = KafkaUtils$.MODULE$.toTransactionalAwareConsumerProperties(this.engineConfig.kafka(), new Some(groupId()));
        transactionalAwareConsumerProperties.put("enable.auto.commit", BoxesRunTime.boxToBoolean(false));
        return new KafkaConsumer<>(transactionalAwareConsumerProperties);
    }

    private KafkaProducer<byte[], byte[]> prepareProducer() {
        Properties producerProperties = KafkaUtils$.MODULE$.toProducerProperties(this.engineConfig.kafka(), groupId());
        producerProperties.put("transactional.id", new StringBuilder(0).append(groupId()).append(UUID.randomUUID().toString()).toString());
        return new KafkaProducer<>(producerProperties);
    }

    private void registerMetrics() {
        consumerMetricsRegistrar_$eq(new KafkaMetricsRegistrar(this.taskId, consumer().metrics(), this.runtimeContext.metricsProvider()));
        consumerMetricsRegistrar().registerMetrics();
        producerMetricsRegistrar_$eq(new KafkaMetricsRegistrar(this.taskId, producer().metrics(), this.runtimeContext.metricsProvider()));
        producerMetricsRegistrar().registerMetrics();
    }

    @Override // java.lang.Runnable
    public void run() {
        ConsumerRecords<byte[], byte[]> poll = consumer().poll(DurationConverters$FiniteDurationops$.MODULE$.toJava$extension(DurationConverters$.MODULE$.FiniteDurationops(this.engineConfig.pollDuration())));
        if (poll.isEmpty()) {
            if (!logger().underlying().isTraceEnabled()) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            } else {
                logger().underlying().trace("No records, skipping");
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
        }
        producer().beginTransaction();
        try {
            processRecords(poll);
            producer().sendOffsetsToTransaction((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(retrieveMaxOffsetsOffsets(poll)).asJava(), consumer().groupMetadata().groupId());
            producer().commitTransaction();
        } catch (Throwable th) {
            if (th instanceof ProducerFencedException ? true : th instanceof OutOfOrderSequenceException ? true : th instanceof AuthorizationException) {
                if (logger().underlying().isWarnEnabled()) {
                    logger().underlying().warn("Fatal producer error: {}. Closing producer without abort transaction", new Object[]{th.getMessage()});
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                }
                throw th;
            }
            Option unapply = NonFatal$.MODULE$.unapply(th);
            if (unapply.isEmpty()) {
                throw th;
            }
            Throwable th2 = (Throwable) unapply.get();
            if (logger().underlying().isWarnEnabled()) {
                logger().underlying().warn("Unhandled error: {}. Aborting kafka transaction", new Object[]{th2.getMessage()});
                BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
            }
            producer().abortTransaction();
            throw th2;
        }
    }

    private Object processRecords(ConsumerRecords<byte[], byte[]> consumerRecords) {
        return Await$.MODULE$.result(sendOutputToKafka((WriterT) Await$.MODULE$.result((Awaitable) this.interpreter.invoke(new interpreterTypes.ScenarioInputBatch(prepareRecords(consumerRecords))), this.engineConfig.interpreterTimeout())), this.engineConfig.publishTimeout());
    }

    private List<Tuple2<interpreterTypes.SourceId, ConsumerRecord<byte[], byte[]>>> prepareRecords(ConsumerRecords<byte[], byte[]> consumerRecords) {
        return (List) sourceToTopic().toList().flatMap(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            String str = (String) tuple2._1();
            Map map = (Map) tuple2._2();
            List list = ((TraversableOnce) CollectionConverters$.MODULE$.iterableAsScalaIterableConverter(consumerRecords.records(str)).asScala()).toList();
            map.keys().foreach(sourceId -> {
                $anonfun$prepareRecords$2(this, list, sourceId);
                return BoxedUnit.UNIT;
            });
            return (List) map.keys().toList().flatMap(sourceId2 -> {
                return (List) list.map(consumerRecord -> {
                    return new Tuple2(sourceId2, consumerRecord);
                }, List$.MODULE$.canBuildFrom());
            }, List$.MODULE$.canBuildFrom());
        }, List$.MODULE$.canBuildFrom());
    }

    private Future<?> sendOutputToKafka(WriterT<Object, List<NuExceptionInfo<? extends Throwable>>, List<interpreterTypes.EndResult<ProducerRecord<byte[], byte[]>>>> writerT) {
        return (Future) implicits$.MODULE$.toTraverseOps(((List) ((List) ((List) writerT.value(Invariant$.MODULE$.catsInstancesForId())).map(endResult -> {
            Option option = endResult.context().get("$eventTimestamp");
            ProducerRecord producerRecord = (ProducerRecord) endResult.result();
            return new ProducerRecord(producerRecord.topic(), producerRecord.partition(), (Long) Option$.MODULE$.apply(producerRecord.timestamp()).orElse(() -> {
                return option;
            }).orNull(Predef$.MODULE$.$conforms()), producerRecord.key(), producerRecord.value(), producerRecord.headers());
        }, List$.MODULE$.canBuildFrom())).$plus$plus((List) ((List) writerT.written(Invariant$.MODULE$.catsInstancesForId())).map(nuExceptionInfo -> {
            return this.serializeError(nuExceptionInfo);
        }, List$.MODULE$.canBuildFrom()), List$.MODULE$.canBuildFrom())).map(producerRecord -> {
            return KafkaUtils$.MODULE$.sendToKafka(producerRecord, this.producer());
        }, List$.MODULE$.canBuildFrom()), UnorderedFoldable$.MODULE$.catsTraverseForList()).sequence(Predef$.MODULE$.$conforms(), Invariant$.MODULE$.catsInstancesForFuture(this.ec));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ProducerRecord<byte[], byte[]> serializeError(NuExceptionInfo<? extends Throwable> nuExceptionInfo) {
        return new KafkaJsonExceptionSerializationSchema(this.metaData, this.engineConfig.exceptionHandlingConfig()).serialize(extractor().extractOrThrow(nuExceptionInfo), Predef$.MODULE$.long2Long(System.currentTimeMillis()));
    }

    private Map<TopicPartition, OffsetAndMetadata> retrieveMaxOffsetsOffsets(ConsumerRecords<byte[], byte[]> consumerRecords) {
        return ((Iterator) CollectionConverters$.MODULE$.asScalaIteratorConverter(consumerRecords.iterator()).asScala()).map(consumerRecord -> {
            return new Tuple2(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), BoxesRunTime.boxToLong(consumerRecord.offset() + 1));
        }).toList().groupBy(tuple2 -> {
            return (TopicPartition) tuple2._1();
        }).mapValues(list -> {
            return BoxesRunTime.boxToLong($anonfun$retrieveMaxOffsetsOffsets$3(list));
        }).mapValues(obj -> {
            return $anonfun$retrieveMaxOffsetsOffsets$5(BoxesRunTime.unboxToLong(obj));
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        ((List) new $colon.colon(producer(), new $colon.colon(consumer(), new $colon.colon(producerMetricsRegistrar(), new $colon.colon(consumerMetricsRegistrar(), Nil$.MODULE$)))).filter(autoCloseable -> {
            return BoxesRunTime.boxToBoolean($anonfun$close$1(autoCloseable));
        })).foreach(autoCloseable2 -> {
            $anonfun$close$2(this, autoCloseable2);
            return BoxedUnit.UNIT;
        });
        if (!logger().underlying().isInfoEnabled()) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            logger().underlying().info("Closed runner for {}", new Object[]{this.metaData.id()});
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    private void configSanityCheck() {
        if (Predef$.MODULE$.Integer2int(new ConsumerConfig(KafkaUtils$.MODULE$.toTransactionalAwareConsumerProperties(this.engineConfig.kafka(), None$.MODULE$)).getInt("max.poll.interval.ms")) <= this.engineConfig.interpreterTimeout().$plus(this.engineConfig.publishTimeout()).toMillis()) {
            throw new IllegalArgumentException(new StringBuilder(70).append("publishTimeout + interpreterTimeout cannot exceed ").append("max.poll.interval.ms").toString());
        }
    }

    private void retryCloseOnInterrupt(Function0<BoxedUnit> function0) {
        try {
            function0.apply$mcV$sp();
        } catch (Throwable th) {
            if (!(th instanceof InterruptedException ? true : th instanceof InterruptException)) {
                throw th;
            }
            boolean interrupted = Thread.interrupted();
            if (logger().underlying().isDebugEnabled()) {
                logger().underlying().debug("Interrupted during close: {}, trying once more", new Object[]{BoxesRunTime.boxToBoolean(interrupted)});
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            function0.apply$mcV$sp();
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$prepareRecords$3(KafkaSingleScenarioTaskRun kafkaSingleScenarioTaskRun, interpreterTypes.SourceId sourceId, ConsumerRecord consumerRecord) {
        kafkaSingleScenarioTaskRun.sourceMetrics.markElement(sourceId, consumerRecord.timestamp());
    }

    public static final /* synthetic */ void $anonfun$prepareRecords$2(KafkaSingleScenarioTaskRun kafkaSingleScenarioTaskRun, List list, interpreterTypes.SourceId sourceId) {
        list.foreach(consumerRecord -> {
            $anonfun$prepareRecords$3(kafkaSingleScenarioTaskRun, sourceId, consumerRecord);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ long $anonfun$retrieveMaxOffsetsOffsets$3(List list) {
        return BoxesRunTime.unboxToLong(((TraversableOnce) list.map(tuple2 -> {
            return BoxesRunTime.boxToLong(tuple2._2$mcJ$sp());
        }, List$.MODULE$.canBuildFrom())).max(Ordering$Long$.MODULE$));
    }

    public static final /* synthetic */ OffsetAndMetadata $anonfun$retrieveMaxOffsetsOffsets$5(long j) {
        return new OffsetAndMetadata(j);
    }

    public static final /* synthetic */ boolean $anonfun$close$1(AutoCloseable autoCloseable) {
        return autoCloseable != null;
    }

    public static final /* synthetic */ void $anonfun$close$2(KafkaSingleScenarioTaskRun kafkaSingleScenarioTaskRun, AutoCloseable autoCloseable) {
        kafkaSingleScenarioTaskRun.retryCloseOnInterrupt(() -> {
            autoCloseable.close();
        });
    }

    public KafkaSingleScenarioTaskRun(String str, MetaData metaData, EngineRuntimeContext engineRuntimeContext, KafkaTransactionalScenarioInterpreter.KafkaInterpreterConfig kafkaInterpreterConfig, interpreterTypes.ScenarioInterpreter<Future, ConsumerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> scenarioInterpreter, SourceMetrics sourceMetrics, ExecutionContext executionContext) {
        this.taskId = str;
        this.metaData = metaData;
        this.runtimeContext = engineRuntimeContext;
        this.engineConfig = kafkaInterpreterConfig;
        this.interpreter = scenarioInterpreter;
        this.sourceMetrics = sourceMetrics;
        this.ec = executionContext;
        LazyLogging.$init$(this);
        this.groupId = metaData.id();
        this.extractor = new DefaultWithExceptionExtractor();
        this.sourceToTopic = ((TraversableLike) scenarioInterpreter.sources().flatMap(tuple2 -> {
            if (tuple2 != null) {
                interpreterTypes.SourceId sourceId = (interpreterTypes.SourceId) tuple2._1();
                LiteKafkaSource liteKafkaSource = (Source) tuple2._2();
                if (liteKafkaSource instanceof LiteKafkaSource) {
                    LiteKafkaSource liteKafkaSource2 = liteKafkaSource;
                    return (List) liteKafkaSource2.topics().map(str2 -> {
                        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str2), new Tuple2(sourceId, liteKafkaSource2));
                    }, List$.MODULE$.canBuildFrom());
                }
            }
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            interpreterTypes.SourceId sourceId2 = (interpreterTypes.SourceId) tuple2._1();
            throw new IllegalArgumentException(new StringBuilder(24).append("Unexpected source: ").append((Source) tuple2._2()).append(" for ").append(sourceId2.value()).toString());
        }, Map$.MODULE$.canBuildFrom())).groupBy(tuple22 -> {
            return (String) tuple22._1();
        }).mapValues(map -> {
            return map.values().toMap(Predef$.MODULE$.$conforms());
        });
    }
}
