/*
 * Decompiled with CFR 0.152.
 */
package pl.touk.nussknacker.engine.lite.kafka;

import cats.Applicative;
import cats.Functor;
import cats.Invariant$;
import cats.UnorderedFoldable$;
import cats.data.WriterT;
import cats.implicits$;
import cats.package$;
import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import pl.touk.nussknacker.engine.api.MetaData;
import pl.touk.nussknacker.engine.api.exception.NuExceptionInfo;
import pl.touk.nussknacker.engine.api.process.Source;
import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext;
import pl.touk.nussknacker.engine.kafka.KafkaUtils$;
import pl.touk.nussknacker.engine.kafka.exception.KafkaJsonExceptionSerializationSchema;
import pl.touk.nussknacker.engine.lite.api.interpreterTypes;
import pl.touk.nussknacker.engine.lite.kafka.KafkaMetricsRegistrar;
import pl.touk.nussknacker.engine.lite.kafka.KafkaTransactionalScenarioInterpreter;
import pl.touk.nussknacker.engine.lite.kafka.Task;
import pl.touk.nussknacker.engine.lite.kafka.api.LiteKafkaSource;
import pl.touk.nussknacker.engine.lite.metrics.SourceMetrics;
import pl.touk.nussknacker.engine.util.exception.WithExceptionExtractor$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.Iterator;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.compat.java8.DurationConverters;
import scala.compat.java8.DurationConverters$;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.jdk.CollectionConverters$;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;
import scala.util.control.NonFatal$;

@ScalaSignature(bytes="\u0006\u0001\t\u0005f\u0001\u0002\u0014(\u0001QB\u0001b\u0013\u0001\u0003\u0002\u0003\u0006I\u0001\u0014\u0005\t3\u0002\u0011\t\u0011)A\u00055\"A\u0001\r\u0001B\u0001B\u0003%\u0011\r\u0003\u0005h\u0001\t\u0005\t\u0015!\u0003i\u0011!Y\bA!A!\u0002\u0013a\bBCA\u0010\u0001\t\u0005\t\u0015!\u0003\u0002\"!Q\u0011Q\u0006\u0001\u0003\u0002\u0003\u0006Y!a\f\t\u000f\u0005U\u0002\u0001\"\u0001\u00028!I\u00111\n\u0001C\u0002\u0013%\u0011Q\n\u0005\b\u0003\u001f\u0002\u0001\u0015!\u0003M\u0011-\t\t\u0006\u0001a\u0001\u0002\u0004%I!a\u0015\t\u0017\u0005m\u0004\u00011AA\u0002\u0013%\u0011Q\u0010\u0005\f\u0003\u0013\u0003\u0001\u0019!A!B\u0013\t)\u0006C\u0006\u0002\f\u0002\u0001\r\u00111A\u0005\n\u00055\u0005bCAM\u0001\u0001\u0007\t\u0019!C\u0005\u00037C1\"a(\u0001\u0001\u0004\u0005\t\u0015)\u0003\u0002\u0010\"Y\u0011\u0011\u0015\u0001A\u0002\u0003\u0007I\u0011BAR\u0011-\tY\u000b\u0001a\u0001\u0002\u0004%I!!,\t\u0017\u0005E\u0006\u00011A\u0001B\u0003&\u0011Q\u0015\u0005\f\u0003g\u0003\u0001\u0019!a\u0001\n\u0013\t\u0019\u000bC\u0006\u00026\u0002\u0001\r\u00111A\u0005\n\u0005]\u0006bCA^\u0001\u0001\u0007\t\u0011)Q\u0005\u0003KC\u0011\"!0\u0001\u0005\u0004%I!a0\t\u0011\u0005\u001d\b\u0001)A\u0005\u0003\u0003Dq!!;\u0001\t\u0003\tY\u000fC\u0004\u0002n\u0002!I!a\u0015\t\u000f\u0005=\b\u0001\"\u0003\u0002\u000e\"9\u0011\u0011\u001f\u0001\u0005\n\u0005-\bbBAz\u0001\u0011\u0005\u00111\u001e\u0005\b\u0003k\u0004A\u0011BA|\u0011\u001d\u0011I\u0001\u0001C\u0005\u0005\u0017AqA!\f\u0001\t\u0013\u0011y\u0003C\u0004\u0003h\u0001!IA!\u001b\t\u000f\tU\u0004\u0001\"\u0003\u0003x!9!q\u0012\u0001\u0005\u0002\u0005-\bb\u0002BI\u0001\u0011%\u00111\u001e\u0005\b\u0005'\u0003A\u0011\u0002BK\u0005iY\u0015MZ6b'&tw\r\\3TG\u0016t\u0017M]5p)\u0006\u001c8NU;o\u0015\tA\u0013&A\u0003lC\u001a\\\u0017M\u0003\u0002+W\u0005!A.\u001b;f\u0015\taS&\u0001\u0004f]\u001eLg.\u001a\u0006\u0003]=\n1B\\;tg.t\u0017mY6fe*\u0011\u0001'M\u0001\u0005i>,8NC\u00013\u0003\t\u0001Hn\u0001\u0001\u0014\t\u0001)T(\u0011\t\u0003mmj\u0011a\u000e\u0006\u0003qe\nA\u0001\\1oO*\t!(\u0001\u0003kCZ\f\u0017B\u0001\u001f8\u0005\u0019y%M[3diB\u0011ahP\u0007\u0002O%\u0011\u0001i\n\u0002\u0005)\u0006\u001c8\u000e\u0005\u0002C\u00136\t1I\u0003\u0002E\u000b\u0006a1oY1mC2|wmZ5oO*\u0011aiR\u0001\tif\u0004Xm]1gK*\t\u0001*A\u0002d_6L!AS\"\u0003\u00171\u000b'0\u001f'pO\u001eLgnZ\u0001\u0007i\u0006\u001c8.\u00133\u0011\u000553fB\u0001(U!\ty%+D\u0001Q\u0015\t\t6'\u0001\u0004=e>|GO\u0010\u0006\u0002'\u0006)1oY1mC&\u0011QKU\u0001\u0007!J,G-\u001a4\n\u0005]C&AB*ue&twM\u0003\u0002V%\u0006AQ.\u001a;b\t\u0006$\u0018\r\u0005\u0002\\=6\tAL\u0003\u0002^W\u0005\u0019\u0011\r]5\n\u0005}c&\u0001C'fi\u0006$\u0015\r^1\u0002\u001dI,h\u000e^5nK\u000e{g\u000e^3yiB\u0011!-Z\u0007\u0002G*\u0011A\rX\u0001\u000feVtG/[7fG>tG/\u001a=u\u0013\t17M\u0001\u000bF]\u001eLg.\u001a*v]RLW.Z\"p]R,\u0007\u0010^\u0001\rK:<\u0017N\\3D_:4\u0017n\u001a\t\u0003Sbt!A\u001b<\u000f\u0005-,hB\u00017u\u001d\ti7O\u0004\u0002oe:\u0011q.\u001d\b\u0003\u001fBL\u0011AM\u0005\u0003aEJ!AL\u0018\n\u00051j\u0013B\u0001\u0016,\u0013\tA\u0013&\u0003\u0002xO\u0005)3*\u00194lCR\u0013\u0018M\\:bGRLwN\\1m'\u000e,g.\u0019:j_&sG/\u001a:qe\u0016$XM]\u0005\u0003sj\u0014A\"\u00128hS:,7i\u001c8gS\u001eT!a^\u0014\u0002\u0017%tG/\u001a:qe\u0016$XM\u001d\t\n{\u0006\u0005\u0011qAA\n\u00033q!a\u001b@\n\u0005}L\u0013AG*dK:\f'/[8J]R,'\u000f\u001d:fi\u0016\u0014h)Y2u_JL\u0018\u0002BA\u0002\u0003\u000b\u0011\u0001eU2f]\u0006\u0014\u0018n\\%oi\u0016\u0014\bO]3uKJ<\u0016\u000e\u001e5MS\u001a,7-_2mK*\u0011q0\u000b\t\u0005\u0003\u0013\ty!\u0004\u0002\u0002\f)\u0019\u0011Q\u0002*\u0002\u0015\r|gnY;se\u0016tG/\u0003\u0003\u0002\u0012\u0005-!A\u0002$viV\u0014X\rE\u0002j\u0003+I1!a\u0006{\u0005\u0015Ie\u000e];u!\rI\u00171D\u0005\u0004\u0003;Q(AB(viB,H/A\u0007t_V\u00148-Z'fiJL7m\u001d\t\u0005\u0003G\tI#\u0004\u0002\u0002&)\u0019\u0011qE\u0015\u0002\u000f5,GO]5dg&!\u00111FA\u0013\u00055\u0019v.\u001e:dK6+GO]5dg\u0006\u0011Qm\u0019\t\u0005\u0003\u0013\t\t$\u0003\u0003\u00024\u0005-!\u0001E#yK\u000e,H/[8o\u0007>tG/\u001a=u\u0003\u0019a\u0014N\\5u}Qq\u0011\u0011HA \u0003\u0003\n\u0019%!\u0012\u0002H\u0005%C\u0003BA\u001e\u0003{\u0001\"A\u0010\u0001\t\u000f\u00055\u0002\u0002q\u0001\u00020!)1\n\u0003a\u0001\u0019\")\u0011\f\u0003a\u00015\")\u0001\r\u0003a\u0001C\")q\r\u0003a\u0001Q\")1\u0010\u0003a\u0001y\"9\u0011q\u0004\u0005A\u0002\u0005\u0005\u0012aB4s_V\u0004\u0018\nZ\u000b\u0002\u0019\u0006AqM]8va&#\u0007%\u0001\u0005d_:\u001cX/\\3s+\t\t)\u0006\u0005\u0005\u0002X\u0005%\u0014QNA7\u001b\t\tIF\u0003\u0003\u0002R\u0005m#\u0002BA/\u0003?\nqa\u00197jK:$8OC\u0002)\u0003CRA!a\u0019\u0002f\u00051\u0011\r]1dQ\u0016T!!a\u001a\u0002\u0007=\u0014x-\u0003\u0003\u0002l\u0005e#!D&bM.\f7i\u001c8tk6,'\u000f\u0005\u0004\u0002p\u0005E\u0014QO\u0007\u0002%&\u0019\u00111\u000f*\u0003\u000b\u0005\u0013(/Y=\u0011\t\u0005=\u0014qO\u0005\u0004\u0003s\u0012&\u0001\u0002\"zi\u0016\fAbY8ogVlWM]0%KF$B!a \u0002\u0006B!\u0011qNAA\u0013\r\t\u0019I\u0015\u0002\u0005+:LG\u000fC\u0005\u0002\b2\t\t\u00111\u0001\u0002V\u0005\u0019\u0001\u0010J\u0019\u0002\u0013\r|gn];nKJ\u0004\u0013\u0001\u00039s_\u0012,8-\u001a:\u0016\u0005\u0005=\u0005\u0003CAI\u0003+\u000bi'!\u001c\u000e\u0005\u0005M%\u0002BAF\u00037JA!a&\u0002\u0014\ni1*\u00194lCB\u0013x\u000eZ;dKJ\fA\u0002\u001d:pIV\u001cWM]0%KF$B!a \u0002\u001e\"I\u0011qQ\b\u0002\u0002\u0003\u0007\u0011qR\u0001\naJ|G-^2fe\u0002\n\u0001dY8ogVlWM]'fiJL7m\u001d*fO&\u001cHO]1s+\t\t)\u000bE\u0002?\u0003OK1!!+(\u0005UY\u0015MZ6b\u001b\u0016$(/[2t%\u0016<\u0017n\u001d;sCJ\fAdY8ogVlWM]'fiJL7m\u001d*fO&\u001cHO]1s?\u0012*\u0017\u000f\u0006\u0003\u0002\u0000\u0005=\u0006\"CAD%\u0005\u0005\t\u0019AAS\u0003e\u0019wN\\:v[\u0016\u0014X*\u001a;sS\u000e\u001c(+Z4jgR\u0014\u0018M\u001d\u0011\u00021A\u0014x\u000eZ;dKJlU\r\u001e:jGN\u0014VmZ5tiJ\f'/\u0001\u000fqe>$WoY3s\u001b\u0016$(/[2t%\u0016<\u0017n\u001d;sCJ|F%Z9\u0015\t\u0005}\u0014\u0011\u0018\u0005\n\u0003\u000f+\u0012\u0011!a\u0001\u0003K\u000b\u0011\u0004\u001d:pIV\u001cWM]'fiJL7m\u001d*fO&\u001cHO]1sA\u0005i1o\\;sG\u0016$v\u000eV8qS\u000e,\"!!1\u0011\r5\u000b\u0019\rTAd\u0013\r\t)\r\u0017\u0002\u0004\u001b\u0006\u0004\bcB'\u0002D\u0006%\u0017Q\u001c\t\u0005\u0003\u0017\f9N\u0004\u0003\u0002N\u0006EgbA6\u0002P&\u0011Q,K\u0005\u0005\u0003'\f).\u0001\tj]R,'\u000f\u001d:fi\u0016\u0014H+\u001f9fg*\u0011Q,K\u0005\u0005\u00033\fYN\u0001\u0005T_V\u00148-Z%e\u0015\u0011\t\u0019.!6\u0011\t\u0005}\u00171]\u0007\u0003\u0003CT!!X\u0014\n\t\u0005\u0015\u0018\u0011\u001d\u0002\u0010\u0019&$XmS1gW\u0006\u001cv.\u001e:dK\u0006q1o\\;sG\u0016$v\u000eV8qS\u000e\u0004\u0013\u0001B5oSR$\"!a \u0002\u001fA\u0014X\r]1sK\u000e{gn];nKJ\fq\u0002\u001d:fa\u0006\u0014X\r\u0015:pIV\u001cWM]\u0001\u0010e\u0016<\u0017n\u001d;fe6+GO]5dg\u0006\u0019!/\u001e8\u0002\u001dA\u0014xnY3tgJ+7m\u001c:egR!\u0011\u0011`A\u0000!\u0011\ty'a?\n\u0007\u0005u(KA\u0002B]fDqA!\u0001\u001f\u0001\u0004\u0011\u0019!A\u0004sK\u000e|'\u000fZ:\u0011\u0011\u0005]#QAA7\u0003[JAAa\u0002\u0002Z\ty1i\u001c8tk6,'OU3d_J$7/\u0001\bqe\u0016\u0004\u0018M]3SK\u000e|'\u000fZ:\u0015\t\t5!1\u0006\t\u0007\u0005\u001f\u0011IBa\b\u000f\t\tE!Q\u0003\b\u0004\u001f\nM\u0011\"A*\n\u0007\t]!+A\u0004qC\u000e\\\u0017mZ3\n\t\tm!Q\u0004\u0002\u0005\u0019&\u001cHOC\u0002\u0003\u0018I\u0003\u0002\"a\u001c\u0003\"\u0005%'QE\u0005\u0004\u0005G\u0011&A\u0002+va2,'\u0007\u0005\u0005\u0002X\t\u001d\u0012QNA7\u0013\u0011\u0011I#!\u0017\u0003\u001d\r{gn];nKJ\u0014VmY8sI\"9!\u0011A\u0010A\u0002\t\r\u0011!E:f]\u0012|U\u000f\u001e9viR{7*\u00194lCR!!\u0011\u0007B#a\u0011\u0011\u0019D!\u000f\u0011\r\u0005%\u0011q\u0002B\u001b!\u0011\u00119D!\u000f\r\u0001\u0011Y!1\b\u0011\u0002\u0002\u0003\u0005)\u0011\u0001B\u001f\u0005\ryF%M\t\u0005\u0005\u007f\tI\u0010\u0005\u0003\u0002p\t\u0005\u0013b\u0001B\"%\n9aj\u001c;iS:<\u0007b\u0002B$A\u0001\u0007!\u0011J\u0001\u0007_V$\b/\u001e;\u0011\r\t-#\u0011\u000bB,\u001d\u0011\tiM!\u0014\n\t\t=\u0013Q[\u0001\fG>lWn\u001c8UsB,7/\u0003\u0003\u0003T\tU#A\u0003*fgVdG\u000fV=qK*!!qJAk!\u0019\u0011IF!\u0018\u0003b9!!1LAi\u001b\t\t).\u0003\u0003\u0003`\u0005m'!C#oIJ+7/\u001e7u!!\t\tJa\u0019\u0002n\u00055\u0014\u0002\u0002B3\u0003'\u0013a\u0002\u0015:pIV\u001cWM\u001d*fG>\u0014H-\u0001\btKJL\u0017\r\\5{K\u0016\u0013(o\u001c:\u0015\t\t\u0005$1\u000e\u0005\b\u0005[\n\u0003\u0019\u0001B8\u0003\u0015)'O]8s!\u0011\u0011YE!\u001d\n\t\tM$Q\u000b\u0002\n\u000bJ\u0014xN\u001d+za\u0016\f\u0011D]3ue&,g/Z'bq>3gm]3ug>3gm]3ugR!!\u0011\u0010BG!\u001di\u00151\u0019B>\u0005\u000f\u0003BA! \u0003\u00046\u0011!q\u0010\u0006\u0005\u0005\u0003\u000by&\u0001\u0004d_6lwN\\\u0005\u0005\u0005\u000b\u0013yH\u0001\bU_BL7\rU1si&$\u0018n\u001c8\u0011\t\u0005]#\u0011R\u0005\u0005\u0005\u0017\u000bIFA\tPM\u001a\u001cX\r^!oI6+G/\u00193bi\u0006DqA!\u0001#\u0001\u0004\u0011\u0019!A\u0003dY>\u001cX-A\td_:4\u0017nZ*b]&$\u0018p\u00115fG.\fQC]3uef\u001cEn\\:f\u001f:Le\u000e^3seV\u0004H\u000f\u0006\u0003\u0002\u0000\t]\u0005b\u0002BMK\u0001\u0007!1T\u0001\u0007C\u000e$\u0018n\u001c8\u0011\r\u0005=$QTA@\u0013\r\u0011yJ\u0015\u0002\n\rVt7\r^5p]B\u0002")
public class KafkaSingleScenarioTaskRun
implements Task,
LazyLogging {
    private final String taskId;
    private final MetaData metaData;
    private final EngineRuntimeContext runtimeContext;
    private final KafkaTransactionalScenarioInterpreter.EngineConfig engineConfig;
    private final interpreterTypes.ScenarioInterpreter<Future, ConsumerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> interpreter;
    private final SourceMetrics sourceMetrics;
    private final ExecutionContext ec;
    private final String groupId;
    private KafkaConsumer<byte[], byte[]> consumer;
    private KafkaProducer<byte[], byte[]> producer;
    private KafkaMetricsRegistrar consumerMetricsRegistrar;
    private KafkaMetricsRegistrar producerMetricsRegistrar;
    private final scala.collection.immutable.Map<String, scala.collection.immutable.Map<interpreterTypes.SourceId, LiteKafkaSource>> sourceToTopic;
    private transient Logger logger;
    private volatile transient boolean bitmap$trans$0;

    private Logger logger$lzycompute() {
        KafkaSingleScenarioTaskRun kafkaSingleScenarioTaskRun = this;
        synchronized (kafkaSingleScenarioTaskRun) {
            if (!this.bitmap$trans$0) {
                this.logger = LazyLogging.logger$((LazyLogging)this);
                this.bitmap$trans$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$trans$0 ? this.logger$lzycompute() : this.logger;
    }

    private String groupId() {
        return this.groupId;
    }

    private KafkaConsumer<byte[], byte[]> consumer() {
        return this.consumer;
    }

    private void consumer_$eq(KafkaConsumer<byte[], byte[]> x$1) {
        this.consumer = x$1;
    }

    private KafkaProducer<byte[], byte[]> producer() {
        return this.producer;
    }

    private void producer_$eq(KafkaProducer<byte[], byte[]> x$1) {
        this.producer = x$1;
    }

    private KafkaMetricsRegistrar consumerMetricsRegistrar() {
        return this.consumerMetricsRegistrar;
    }

    private void consumerMetricsRegistrar_$eq(KafkaMetricsRegistrar x$1) {
        this.consumerMetricsRegistrar = x$1;
    }

    private KafkaMetricsRegistrar producerMetricsRegistrar() {
        return this.producerMetricsRegistrar;
    }

    private void producerMetricsRegistrar_$eq(KafkaMetricsRegistrar x$1) {
        this.producerMetricsRegistrar = x$1;
    }

    private scala.collection.immutable.Map<String, scala.collection.immutable.Map<interpreterTypes.SourceId, LiteKafkaSource>> sourceToTopic() {
        return this.sourceToTopic;
    }

    @Override
    public void init() {
        this.configSanityCheck();
        this.consumer_$eq(this.prepareConsumer());
        this.producer_$eq(this.prepareProducer());
        this.producer().initTransactions();
        this.consumer().subscribe(CollectionConverters$.MODULE$.asJavaCollectionConverter((Iterable)this.sourceToTopic().keys().toSet()).asJavaCollection());
        this.registerMetrics();
    }

    private KafkaConsumer<byte[], byte[]> prepareConsumer() {
        Properties properties = KafkaUtils$.MODULE$.toTransactionalAwareConsumerProperties(this.engineConfig.kafka(), (Option)new Some((Object)this.groupId()));
        properties.put("enable.auto.commit", BoxesRunTime.boxToBoolean((boolean)false));
        return new KafkaConsumer(properties);
    }

    private KafkaProducer<byte[], byte[]> prepareProducer() {
        Properties producerProps = KafkaUtils$.MODULE$.toProducerProperties(this.engineConfig.kafka(), this.groupId());
        producerProps.put("transactional.id", new StringBuilder(0).append(this.groupId()).append(UUID.randomUUID().toString()).toString());
        return new KafkaProducer(producerProps);
    }

    private void registerMetrics() {
        this.consumerMetricsRegistrar_$eq(new KafkaMetricsRegistrar(this.taskId, this.consumer().metrics(), this.runtimeContext.metricsProvider()));
        this.consumerMetricsRegistrar().registerMetrics();
        this.producerMetricsRegistrar_$eq(new KafkaMetricsRegistrar(this.taskId, this.producer().metrics(), this.runtimeContext.metricsProvider()));
        this.producerMetricsRegistrar().registerMetrics();
    }

    @Override
    public void run() {
        ConsumerRecords records = this.consumer().poll(DurationConverters.FiniteDurationops$.MODULE$.toJava$extension(DurationConverters$.MODULE$.FiniteDurationops(this.engineConfig.pollDuration())));
        if (records.isEmpty()) {
            BoxedUnit boxedUnit;
            if (this.logger().underlying().isTraceEnabled()) {
                this.logger().underlying().trace("No records, skipping");
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            return;
        }
        this.producer().beginTransaction();
        try {
            this.processRecords((ConsumerRecords<byte[], byte[]>)records);
            scala.collection.immutable.Map<TopicPartition, OffsetAndMetadata> offsetsMap = this.retrieveMaxOffsetsOffsets((ConsumerRecords<byte[], byte[]>)records);
            this.producer().sendOffsetsToTransaction((Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter(offsetsMap).asJava(), this.groupId());
            this.producer().commitTransaction();
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            boolean bl = throwable2 instanceof ProducerFencedException ? true : (throwable2 instanceof OutOfOrderSequenceException ? true : throwable2 instanceof AuthorizationException);
            if (bl) {
                BoxedUnit boxedUnit;
                if (this.logger().underlying().isWarnEnabled()) {
                    this.logger().underlying().warn("Fatal producer error: {}. Closing producer without abort transaction", new Object[]{throwable2.getMessage()});
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                throw throwable2;
            }
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (!option.isEmpty()) {
                BoxedUnit boxedUnit;
                Throwable e = (Throwable)option.get();
                if (this.logger().underlying().isWarnEnabled()) {
                    this.logger().underlying().warn("Unhandled error: {}. Aborting kafka transaction", new Object[]{e.getMessage()});
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                this.producer().abortTransaction();
                throw e;
            }
            throw throwable;
        }
    }

    private Object processRecords(ConsumerRecords<byte[], byte[]> records) {
        List<Tuple2<interpreterTypes.SourceId, ConsumerRecord<byte[], byte[]>>> valuesToRun = this.prepareRecords(records);
        WriterT output = (WriterT)Await$.MODULE$.result((Awaitable)this.interpreter.invoke(new interpreterTypes.ScenarioInputBatch(valuesToRun)), this.engineConfig.interpreterTimeout());
        return Await$.MODULE$.result(this.sendOutputToKafka((WriterT<Object, List<NuExceptionInfo<? extends Throwable>>, List<interpreterTypes.EndResult<ProducerRecord<byte[], byte[]>>>>)output), this.engineConfig.publishTimeout());
    }

    private List<Tuple2<interpreterTypes.SourceId, ConsumerRecord<byte[], byte[]>>> prepareRecords(ConsumerRecords<byte[], byte[]> records) {
        return (List)this.sourceToTopic().toList().flatMap((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            String topic = (String)tuple2._1();
            scala.collection.immutable.Map sourcesSubscribedOnTopic = (scala.collection.immutable.Map)tuple2._2();
            List forTopic = ((TraversableOnce)CollectionConverters$.MODULE$.iterableAsScalaIterableConverter(records.records(topic)).asScala()).toList();
            sourcesSubscribedOnTopic.keys().foreach((Function1 & Serializable & scala.Serializable)sourceId -> {
                forTopic.foreach((Function1 & Serializable & scala.Serializable)record -> {
                    $this.sourceMetrics.markElement(sourceId, record.timestamp());
                    return BoxedUnit.UNIT;
                });
                return BoxedUnit.UNIT;
            });
            List list = (List)sourcesSubscribedOnTopic.keys().toList().flatMap((Function1 & Serializable & scala.Serializable)sourceId -> (List)forTopic.map((Function1 & Serializable & scala.Serializable)x$3 -> new Tuple2(sourceId, x$3), List$.MODULE$.canBuildFrom()), List$.MODULE$.canBuildFrom());
            return list;
        }, List$.MODULE$.canBuildFrom());
    }

    private Future<?> sendOutputToKafka(WriterT<Object, List<NuExceptionInfo<? extends Throwable>>, List<interpreterTypes.EndResult<ProducerRecord<byte[], byte[]>>>> output) {
        List resultsWithEventTimestamp = (List)((List)output.value((Functor)package$.MODULE$.catsInstancesForId())).map((Function1 & Serializable & scala.Serializable)endResult -> {
            Option contextEventTimestamp = endResult.context().get("$eventTimestamp");
            ProducerRecord producerRecord = (ProducerRecord)endResult.result();
            return new ProducerRecord(producerRecord.topic(), producerRecord.partition(), (Long)Option$.MODULE$.apply((Object)producerRecord.timestamp()).orElse((Function0 & Serializable & scala.Serializable)() -> contextEventTimestamp).orNull(Predef$.MODULE$.$conforms()), producerRecord.key(), producerRecord.value(), (java.lang.Iterable)producerRecord.headers());
        }, List$.MODULE$.canBuildFrom());
        List errors = (List)((List)output.written((Functor)package$.MODULE$.catsInstancesForId())).map((Function1 & Serializable & scala.Serializable)error -> this.serializeError((NuExceptionInfo<? extends Throwable>)error), List$.MODULE$.canBuildFrom());
        return (Future)implicits$.MODULE$.toTraverseOps(((List)resultsWithEventTimestamp.$plus$plus((GenTraversableOnce)errors, List$.MODULE$.canBuildFrom())).map((Function1 & Serializable & scala.Serializable)x$4 -> KafkaUtils$.MODULE$.sendToKafka(x$4, this.producer()), List$.MODULE$.canBuildFrom()), UnorderedFoldable$.MODULE$.catsTraverseForList()).sequence(Predef$.MODULE$.$conforms(), (Applicative)Invariant$.MODULE$.catsInstancesForFuture(this.ec));
    }

    private ProducerRecord<byte[], byte[]> serializeError(NuExceptionInfo<? extends Throwable> error) {
        NuExceptionInfo nonTransient = WithExceptionExtractor$.MODULE$.extractOrThrow(error);
        return new KafkaJsonExceptionSerializationSchema(this.metaData, this.engineConfig.exceptionHandlingConfig()).serialize(nonTransient);
    }

    private scala.collection.immutable.Map<TopicPartition, OffsetAndMetadata> retrieveMaxOffsetsOffsets(ConsumerRecords<byte[], byte[]> records) {
        return ((Iterator)CollectionConverters$.MODULE$.asScalaIteratorConverter(records.iterator()).asScala()).map((Function1 & Serializable & scala.Serializable)rec -> {
            long upcomingOffset = rec.offset() + 1L;
            return new Tuple2((Object)new TopicPartition(rec.topic(), rec.partition()), (Object)BoxesRunTime.boxToLong((long)upcomingOffset));
        }).toList().groupBy((Function1 & Serializable & scala.Serializable)x$5 -> (TopicPartition)x$5._1()).mapValues((Function1 & Serializable & scala.Serializable)x$6 -> BoxesRunTime.boxToLong((long)KafkaSingleScenarioTaskRun.$anonfun$retrieveMaxOffsetsOffsets$3(x$6))).mapValues((Function1 & Serializable & scala.Serializable)x$8 -> KafkaSingleScenarioTaskRun.$anonfun$retrieveMaxOffsetsOffsets$5(BoxesRunTime.unboxToLong((Object)x$8)));
    }

    @Override
    public void close() {
        BoxedUnit boxedUnit;
        ((List)new .colon.colon(this.producer(), (List)new .colon.colon(this.consumer(), (List)new .colon.colon((Object)this.producerMetricsRegistrar(), (List)new .colon.colon((Object)this.consumerMetricsRegistrar(), (List)Nil$.MODULE$)))).filter((Function1 & Serializable & scala.Serializable)x$9 -> BoxesRunTime.boxToBoolean((boolean)KafkaSingleScenarioTaskRun.$anonfun$close$1(x$9)))).foreach((Function1 & Serializable & scala.Serializable)closeable -> {
            this.retryCloseOnInterrupt((Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> closeable.close());
            return BoxedUnit.UNIT;
        });
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info("Closed runner for {}", new Object[]{this.metaData.id()});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    private void configSanityCheck() {
        Properties properties = KafkaUtils$.MODULE$.toTransactionalAwareConsumerProperties(this.engineConfig.kafka(), (Option)None$.MODULE$);
        Integer maxPollInterval = new ConsumerConfig(properties).getInt("max.poll.interval.ms");
        if ((long)Predef$.MODULE$.Integer2int(maxPollInterval) <= this.engineConfig.interpreterTimeout().$plus(this.engineConfig.publishTimeout()).toMillis()) {
            throw new IllegalArgumentException(new StringBuilder(70).append("publishTimeout + interpreterTimeout cannot exceed ").append("max.poll.interval.ms").toString());
        }
    }

    private void retryCloseOnInterrupt(Function0<BoxedUnit> action) {
        try {
            action.apply$mcV$sp();
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            boolean bl = throwable2 instanceof InterruptedException ? true : throwable2 instanceof InterruptException;
            if (bl) {
                BoxedUnit boxedUnit;
                boolean wasInterrupted = Thread.interrupted();
                if (this.logger().underlying().isDebugEnabled()) {
                    this.logger().underlying().debug("Interrupted during close: {}, trying once more", new Object[]{BoxesRunTime.boxToBoolean((boolean)wasInterrupted)});
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                action.apply$mcV$sp();
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            throw throwable;
        }
    }

    public static final /* synthetic */ long $anonfun$retrieveMaxOffsetsOffsets$3(List x$6) {
        return BoxesRunTime.unboxToLong((Object)((TraversableOnce)x$6.map((Function1 & Serializable & scala.Serializable)x$7 -> BoxesRunTime.boxToLong((long)x$7._2$mcJ$sp()), List$.MODULE$.canBuildFrom())).max((Ordering)Ordering.Long$.MODULE$));
    }

    public static final /* synthetic */ OffsetAndMetadata $anonfun$retrieveMaxOffsetsOffsets$5(long x$8) {
        return new OffsetAndMetadata(x$8);
    }

    public static final /* synthetic */ boolean $anonfun$close$1(AutoCloseable x$9) {
        return x$9 != null;
    }

    public KafkaSingleScenarioTaskRun(String taskId, MetaData metaData, EngineRuntimeContext runtimeContext, KafkaTransactionalScenarioInterpreter.EngineConfig engineConfig, interpreterTypes.ScenarioInterpreter<Future, ConsumerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> interpreter, SourceMetrics sourceMetrics, ExecutionContext ec) {
        this.taskId = taskId;
        this.metaData = metaData;
        this.runtimeContext = runtimeContext;
        this.engineConfig = engineConfig;
        this.interpreter = interpreter;
        this.sourceMetrics = sourceMetrics;
        this.ec = ec;
        LazyLogging.$init$((LazyLogging)this);
        this.groupId = metaData.id();
        this.sourceToTopic = ((TraversableLike)interpreter.sources().flatMap((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Source kafkaSource;
            interpreterTypes.SourceId sourceId;
            block5: {
                Tuple2 tuple2;
                block4: {
                    tuple2 = x0$1;
                    if (tuple2 == null) break block4;
                    sourceId = (interpreterTypes.SourceId)tuple2._1();
                    kafkaSource = (Source)tuple2._2();
                    if (kafkaSource instanceof LiteKafkaSource) break block5;
                }
                if (tuple2 != null) {
                    interpreterTypes.SourceId sourceId2 = (interpreterTypes.SourceId)tuple2._1();
                    Source other = (Source)tuple2._2();
                    throw new IllegalArgumentException(new StringBuilder(24).append("Unexpected source: ").append(other).append(" for ").append(sourceId2.value()).toString());
                }
                throw new MatchError((Object)tuple2);
            }
            LiteKafkaSource liteKafkaSource = (LiteKafkaSource)kafkaSource;
            List list = (List)liteKafkaSource.topics().map((Function1 & Serializable & scala.Serializable)topic -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topic), (Object)new Tuple2((Object)sourceId, (Object)liteKafkaSource)), List$.MODULE$.canBuildFrom());
            return list;
        }, Map$.MODULE$.canBuildFrom())).groupBy((Function1 & Serializable & scala.Serializable)x$1 -> (String)x$1._1()).mapValues((Function1 & Serializable & scala.Serializable)x$2 -> x$2.values().toMap(Predef$.MODULE$.$conforms()));
    }
}

