package pl.touk.nussknacker.engine.baseengine.kafka;

import cats.Invariant$;
import cats.UnorderedFoldable$;
import cats.data.WriterT;
import cats.implicits$;
import cats.package$;
import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.io.Closeable;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import pl.touk.nussknacker.engine.api.Context;
import pl.touk.nussknacker.engine.api.MetaData;
import pl.touk.nussknacker.engine.api.exception.EspExceptionInfo;
import pl.touk.nussknacker.engine.api.process.Source;
import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext;
import pl.touk.nussknacker.engine.baseengine.api.interpreterTypes;
import pl.touk.nussknacker.engine.baseengine.kafka.KafkaTransactionalScenarioInterpreter;
import pl.touk.nussknacker.engine.baseengine.kafka.api.CommonKafkaSource;
import pl.touk.nussknacker.engine.baseengine.metrics.SourceMetrics;
import pl.touk.nussknacker.engine.kafka.KafkaUtils$;
import pl.touk.nussknacker.engine.kafka.exception.KafkaJsonExceptionSerializationSchema;
import pl.touk.nussknacker.engine.util.exception.WithExceptionExtractor$;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.compat.java8.DurationConverters$;
import scala.compat.java8.DurationConverters$FiniteDurationops$;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.jdk.CollectionConverters$;
import scala.math.Ordering$Long$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaSingleScenarioTaskRun.scala */
@ScalaSignature(bytes = "\u0006\u0001\t}c\u0001\u0002\u000f\u001e\u0001)B\u0001\"\u0011\u0001\u0003\u0002\u0003\u0006IA\u0011\u0005\t\u001f\u0002\u0011\t\u0011)A\u0005!\"Aa\u000b\u0001B\u0001B\u0003%q\u000b\u0003\u0005^\u0001\t\u0005\t\u0015!\u0003_\u0011!\t\bA!A!\u0002\u0013\u0011\bBCA\u0003\u0001\t\u0005\t\u0015!\u0003\u0002\b!Q\u00111\u0003\u0001\u0003\u0002\u0003\u0006Y!!\u0006\t\u000f\u0005m\u0001\u0001\"\u0001\u0002\u001e!I\u0011\u0011\u0007\u0001C\u0002\u0013%\u00111\u0007\u0005\b\u0003k\u0001\u0001\u0015!\u0003C\u0011-\t9\u0004\u0001a\u0001\u0002\u0004%I!!\u000f\t\u0017\u0005\u0005\u0004\u00011AA\u0002\u0013%\u00111\r\u0005\f\u0003_\u0002\u0001\u0019!A!B\u0013\tY\u0004C\u0006\u0002r\u0001\u0001\r\u00111A\u0005\n\u0005M\u0004bCA@\u0001\u0001\u0007\t\u0019!C\u0005\u0003\u0003C1\"!\"\u0001\u0001\u0004\u0005\t\u0015)\u0003\u0002v!I\u0011q\u0011\u0001C\u0002\u0013%\u0011\u0011\u0012\u0005\t\u0003c\u0003\u0001\u0015!\u0003\u0002\f\"9\u00111\u0017\u0001\u0005\u0002\u0005U\u0006bBA\\\u0001\u0011\u0005\u0011Q\u0017\u0005\b\u0003s\u0003A\u0011BA^\u0011\u001d\t)\u000f\u0001C\u0005\u0003ODqA!\n\u0001\t\u0013\u00119\u0003C\u0004\u00034\u0001!IA!\u000e\t\u000f\t5\u0003\u0001\"\u0001\u00026\"9!q\n\u0001\u0005\n\u0005U\u0006b\u0002B)\u0001\u0011%!1\u000b\u0002\u001b\u0017\u000647.Y*j]\u001edWmU2f]\u0006\u0014\u0018n\u001c+bg.\u0014VO\u001c\u0006\u0003=}\tQa[1gW\u0006T!\u0001I\u0011\u0002\u0015\t\f7/Z3oO&tWM\u0003\u0002#G\u00051QM\\4j]\u0016T!\u0001J\u0013\u0002\u00179,8o]6oC\u000e\\WM\u001d\u0006\u0003M\u001d\nA\u0001^8vW*\t\u0001&\u0001\u0002qY\u000e\u00011\u0003\u0002\u0001,g]\u0002\"\u0001L\u0019\u000e\u00035R!AL\u0018\u0002\t1\fgn\u001a\u0006\u0002a\u0005!!.\u0019<b\u0013\t\u0011TF\u0001\u0004PE*,7\r\u001e\t\u0003iUj\u0011!H\u0005\u0003mu\u0011A\u0001V1tWB\u0011\u0001hP\u0007\u0002s)\u0011!hO\u0001\rg\u000e\fG.\u00197pO\u001eLgn\u001a\u0006\u0003yu\n\u0001\u0002^=qKN\fg-\u001a\u0006\u0002}\u0005\u00191m\\7\n\u0005\u0001K$a\u0003'bufdunZ4j]\u001e\fa\u0001^1tW&#\u0007CA\"M\u001d\t!%\n\u0005\u0002F\u00116\taI\u0003\u0002HS\u00051AH]8pizR\u0011!S\u0001\u0006g\u000e\fG.Y\u0005\u0003\u0017\"\u000ba\u0001\u0015:fI\u00164\u0017BA'O\u0005\u0019\u0019FO]5oO*\u00111\nS\u0001\t[\u0016$\u0018\rR1uCB\u0011\u0011\u000bV\u0007\u0002%*\u00111+I\u0001\u0004CBL\u0017BA+S\u0005!iU\r^1ECR\f\u0017A\u0004:v]RLW.Z\"p]R,\u0007\u0010\u001e\t\u00031nk\u0011!\u0017\u0006\u00035J\u000baB];oi&lWmY8oi\u0016DH/\u0003\u0002]3\n!RI\\4j]\u0016\u0014VO\u001c;j[\u0016\u001cuN\u001c;fqR\fA\"\u001a8hS:,7i\u001c8gS\u001e\u0004\"a\u00188\u000f\u0005\u0001dgBA1l\u001d\t\u0011'N\u0004\u0002dS:\u0011A\r\u001b\b\u0003K\u001et!!\u00124\n\u0003!J!AJ\u0014\n\u0005\u0011*\u0013B\u0001\u0012$\u0013\t\u0001\u0013%\u0003\u0002\u001f?%\u0011Q.H\u0001&\u0017\u000647.\u0019+sC:\u001c\u0018m\u0019;j_:\fGnU2f]\u0006\u0014\u0018n\\%oi\u0016\u0014\bO]3uKJL!a\u001c9\u0003\u0019\u0015sw-\u001b8f\u0007>tg-[4\u000b\u00055l\u0012aC5oi\u0016\u0014\bO]3uKJ\u0004Ba\u001d<z\u007f:\u0011\u0011\r^\u0005\u0003k~\t!dU2f]\u0006\u0014\u0018n\\%oi\u0016\u0014\bO]3uKJ4\u0015m\u0019;pefL!a\u001e=\u0003AM\u001bWM\\1sS>Le\u000e^3saJ,G/\u001a:XSRDG*\u001b4fGf\u001cG.\u001a\u0006\u0003k~\u0001\"A_?\u000e\u0003mT!\u0001 %\u0002\u0015\r|gnY;se\u0016tG/\u0003\u0002\u007fw\n1a)\u001e;ve\u0016\u00042aXA\u0001\u0013\r\t\u0019\u0001\u001d\u0002\u0007\u001fV$\b/\u001e;\u0002\u001bM|WO]2f\u001b\u0016$(/[2t!\u0011\tI!a\u0004\u000e\u0005\u0005-!bAA\u0007?\u00059Q.\u001a;sS\u000e\u001c\u0018\u0002BA\t\u0003\u0017\u0011QbU8ve\u000e,W*\u001a;sS\u000e\u001c\u0018AA3d!\rQ\u0018qC\u0005\u0004\u00033Y(\u0001E#yK\u000e,H/[8o\u0007>tG/\u001a=u\u0003\u0019a\u0014N\\5u}Qq\u0011qDA\u0013\u0003O\tI#a\u000b\u0002.\u0005=B\u0003BA\u0011\u0003G\u0001\"\u0001\u000e\u0001\t\u000f\u0005M\u0001\u0002q\u0001\u0002\u0016!)\u0011\t\u0003a\u0001\u0005\")q\n\u0003a\u0001!\")a\u000b\u0003a\u0001/\")Q\f\u0003a\u0001=\")\u0011\u000f\u0003a\u0001e\"9\u0011Q\u0001\u0005A\u0002\u0005\u001d\u0011aB4s_V\u0004\u0018\nZ\u000b\u0002\u0005\u0006AqM]8va&#\u0007%\u0001\u0005d_:\u001cX/\\3s+\t\tY\u0004\u0005\u0005\u0002>\u0005=\u00131KA*\u001b\t\tyD\u0003\u0003\u00028\u0005\u0005#\u0002BA\"\u0003\u000b\nqa\u00197jK:$8OC\u0002\u001f\u0003\u000fRA!!\u0013\u0002L\u00051\u0011\r]1dQ\u0016T!!!\u0014\u0002\u0007=\u0014x-\u0003\u0003\u0002R\u0005}\"!D&bM.\f7i\u001c8tk6,'\u000f\u0005\u0004\u0002V\u0005]\u00131L\u0007\u0002\u0011&\u0019\u0011\u0011\f%\u0003\u000b\u0005\u0013(/Y=\u0011\t\u0005U\u0013QL\u0005\u0004\u0003?B%\u0001\u0002\"zi\u0016\fAbY8ogVlWM]0%KF$B!!\u001a\u0002lA!\u0011QKA4\u0013\r\tI\u0007\u0013\u0002\u0005+:LG\u000fC\u0005\u0002n1\t\t\u00111\u0001\u0002<\u0005\u0019\u0001\u0010J\u0019\u0002\u0013\r|gn];nKJ\u0004\u0013\u0001\u00039s_\u0012,8-\u001a:\u0016\u0005\u0005U\u0004\u0003CA<\u0003w\n\u0019&a\u0015\u000e\u0005\u0005e$\u0002BA9\u0003\u0003JA!! \u0002z\ti1*\u00194lCB\u0013x\u000eZ;dKJ\fA\u0002\u001d:pIV\u001cWM]0%KF$B!!\u001a\u0002\u0004\"I\u0011QN\b\u0002\u0002\u0003\u0007\u0011QO\u0001\naJ|G-^2fe\u0002\nQb]8ve\u000e,Gk\u001c+pa&\u001cWCAAF!\u0019\u0019\u0015Q\u0012\"\u0002\u0012&\u0019\u0011q\u0012(\u0003\u00075\u000b\u0007\u000fE\u0004D\u0003\u001b\u000b\u0019*a*\u0011\t\u0005U\u0015\u0011\u0015\b\u0005\u0003/\u000bYJD\u0002b\u00033K!aU\u0010\n\t\u0005u\u0015qT\u0001\u0011S:$XM\u001d9sKR,'\u000fV=qKNT!aU\u0010\n\t\u0005\r\u0016Q\u0015\u0002\t'>,(oY3JI*!\u0011QTAP!\u0011\tI+!,\u000e\u0005\u0005-&BA*\u001e\u0013\u0011\ty+a+\u0003#\r{W.\\8o\u0017\u000647.Y*pkJ\u001cW-\u0001\bt_V\u00148-\u001a+p)>\u0004\u0018n\u0019\u0011\u0002\t%t\u0017\u000e\u001e\u000b\u0003\u0003K\n1A];o\u00039\u0001(/\u001a9be\u0016\u0014VmY8sIN$B!!0\u0002\\B1\u0011qXAe\u0003\u001ftA!!1\u0002F:\u0019Q)a1\n\u0003%K1!a2I\u0003\u001d\u0001\u0018mY6bO\u0016LA!a3\u0002N\n!A*[:u\u0015\r\t9\r\u0013\t\t\u0003+\n\t.a%\u0002V&\u0019\u00111\u001b%\u0003\rQ+\b\u000f\\33!\r\t\u0016q[\u0005\u0004\u00033\u0014&aB\"p]R,\u0007\u0010\u001e\u0005\b\u0003;,\u0002\u0019AAp\u0003\u001d\u0011XmY8sIN\u0004\u0002\"!\u0010\u0002b\u0006M\u00131K\u0005\u0005\u0003G\fyDA\bD_:\u001cX/\\3s%\u0016\u001cwN\u001d3t\u0003E\u0019XM\u001c3PkR\u0004X\u000f\u001e+p\u0017\u000647.\u0019\u000b\u0005\u0003S\u0014\u0019\u0001\r\u0003\u0002l\u0006E\b\u0003\u0002>~\u0003[\u0004B!a<\u0002r2\u0001AaCAz-\u0005\u0005\t\u0011!B\u0001\u0003k\u00141a\u0018\u00132#\u0011\t90!@\u0011\t\u0005U\u0013\u0011`\u0005\u0004\u0003wD%a\u0002(pi\"Lgn\u001a\t\u0005\u0003+\ny0C\u0002\u0003\u0002!\u00131!\u00118z\u0011\u001d\u0011)A\u0006a\u0001\u0005\u000f\taa\\;uaV$\bC\u0002B\u0005\u0005\u001f\u0011)B\u0004\u0003\u0002\u0018\n-\u0011\u0002\u0002B\u0007\u0003?\u000b1bY8n[>tG+\u001f9fg&!!\u0011\u0003B\n\u0005)\u0011Vm];miRK\b/\u001a\u0006\u0005\u0005\u001b\ty\n\u0005\u0004\u0003\u0018\tm!q\u0004\b\u0005\u00053\tY*\u0004\u0002\u0002 &!!QDAS\u0005%)e\u000e\u001a*fgVdG\u000f\u0005\u0005\u0002x\t\u0005\u00121KA*\u0013\u0011\u0011\u0019#!\u001f\u0003\u001dA\u0013x\u000eZ;dKJ\u0014VmY8sI\u0006q1/\u001a:jC2L'0Z#se>\u0014H\u0003\u0002B\u0010\u0005SAqAa\u000b\u0018\u0001\u0004\u0011i#A\u0003feJ|'\u000f\u0005\u0003\u0003\n\t=\u0012\u0002\u0002B\u0019\u0005'\u0011\u0011\"\u0012:s_J$\u0016\u0010]3\u00023I,GO]5fm\u0016l\u0015\r_(gMN,Go](gMN,Go\u001d\u000b\u0005\u0005o\u0011Y\u0005E\u0004D\u0003\u001b\u0013ID!\u0012\u0011\t\tm\"\u0011I\u0007\u0003\u0005{QAAa\u0010\u0002F\u000511m\\7n_:LAAa\u0011\u0003>\tqAk\u001c9jGB\u000b'\u000f^5uS>t\u0007\u0003BA\u001f\u0005\u000fJAA!\u0013\u0002@\t\trJ\u001a4tKR\fe\u000eZ'fi\u0006$\u0017\r^1\t\u000f\u0005u\u0007\u00041\u0001\u0002`\u0006)1\r\\8tK\u0006\t2m\u001c8gS\u001e\u001c\u0016M\\5us\u000eCWmY6\u0002+I,GO]=DY>\u001cXm\u00148J]R,'O];qiR!\u0011Q\rB+\u0011\u001d\u00119f\u0007a\u0001\u00053\na!Y2uS>t\u0007CBA+\u00057\n)'C\u0002\u0003^!\u0013\u0011BR;oGRLwN\u001c\u0019")
/* loaded from: input_file:pl/touk/nussknacker/engine/baseengine/kafka/KafkaSingleScenarioTaskRun.class */
public class KafkaSingleScenarioTaskRun implements Task, LazyLogging {
    private final String taskId;
    private final MetaData metaData;
    private final EngineRuntimeContext runtimeContext;
    private final KafkaTransactionalScenarioInterpreter.EngineConfig engineConfig;
    private final interpreterTypes.ScenarioInterpreter<Future, ProducerRecord<byte[], byte[]>> interpreter;
    private final SourceMetrics sourceMetrics;
    private final ExecutionContext ec;
    private final String groupId;
    private KafkaConsumer<byte[], byte[]> consumer;
    private KafkaProducer<byte[], byte[]> producer;
    private final Map<String, Map<interpreterTypes.SourceId, CommonKafkaSource>> sourceToTopic;
    private transient Logger logger;
    private volatile transient boolean bitmap$trans$0;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [pl.touk.nussknacker.engine.baseengine.kafka.KafkaSingleScenarioTaskRun] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                this.logger = LazyLogging.logger$(this);
                r0 = this;
                r0.bitmap$trans$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$trans$0 ? logger$lzycompute() : this.logger;
    }

    private String groupId() {
        return this.groupId;
    }

    private KafkaConsumer<byte[], byte[]> consumer() {
        return this.consumer;
    }

    private void consumer_$eq(KafkaConsumer<byte[], byte[]> kafkaConsumer) {
        this.consumer = kafkaConsumer;
    }

    private KafkaProducer<byte[], byte[]> producer() {
        return this.producer;
    }

    private void producer_$eq(KafkaProducer<byte[], byte[]> kafkaProducer) {
        this.producer = kafkaProducer;
    }

    private Map<String, Map<interpreterTypes.SourceId, CommonKafkaSource>> sourceToTopic() {
        return this.sourceToTopic;
    }

    @Override // pl.touk.nussknacker.engine.baseengine.kafka.Task
    public void init() {
        configSanityCheck();
        Properties producerProperties = KafkaUtils$.MODULE$.toProducerProperties(this.engineConfig.kafka(), groupId());
        producerProperties.put("transactional.id", new StringBuilder(0).append(groupId()).append(UUID.randomUUID().toString()).toString());
        consumer_$eq(new KafkaConsumer<>(KafkaUtils$.MODULE$.toPropertiesForConsumer(this.engineConfig.kafka(), new Some(groupId()))));
        producer_$eq(new KafkaProducer<>(producerProperties));
        producer().initTransactions();
        consumer().subscribe(CollectionConverters$.MODULE$.asJavaCollectionConverter(sourceToTopic().keys().toSet()).asJavaCollection());
        KafkaMetrics kafkaMetrics = new KafkaMetrics(this.taskId, this.runtimeContext.metricsProvider());
        kafkaMetrics.registerMetrics(producer().metrics());
        kafkaMetrics.registerMetrics(consumer().metrics());
    }

    @Override // java.lang.Runnable
    public void run() {
        ConsumerRecords<byte[], byte[]> poll = consumer().poll(DurationConverters$FiniteDurationops$.MODULE$.toJava$extension(DurationConverters$.MODULE$.FiniteDurationops(this.engineConfig.pollDuration())));
        if (poll.isEmpty()) {
            if (!logger().underlying().isDebugEnabled()) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            } else {
                logger().underlying().debug("No records, skipping");
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
        }
        WriterT<Object, List<EspExceptionInfo<? extends Throwable>>, List<interpreterTypes.EndResult<ProducerRecord<byte[], byte[]>>>> writerT = (WriterT) Await$.MODULE$.result((Awaitable) this.interpreter.invoke(new interpreterTypes.ScenarioInputBatch(prepareRecords(poll))), this.engineConfig.interpreterTimeout());
        producer().beginTransaction();
        Await$.MODULE$.result(sendOutputToKafka(writerT), this.engineConfig.publishTimeout());
        producer().sendOffsetsToTransaction((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(retrieveMaxOffsetsOffsets(poll)).asJava(), groupId());
        producer().commitTransaction();
    }

    private List<Tuple2<interpreterTypes.SourceId, Context>> prepareRecords(ConsumerRecords<byte[], byte[]> consumerRecords) {
        return (List) sourceToTopic().toList().flatMap(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            String str = (String) tuple2._1();
            Map map = (Map) tuple2._2();
            List list = ((TraversableOnce) CollectionConverters$.MODULE$.iterableAsScalaIterableConverter(consumerRecords.records(str)).asScala()).toList();
            map.keys().foreach(sourceId -> {
                $anonfun$prepareRecords$2(this, list, sourceId);
                return BoxedUnit.UNIT;
            });
            return (List) map.mapValues(commonKafkaSource -> {
                return (List) list.map(consumerRecord -> {
                    return commonKafkaSource.deserialize(this.runtimeContext, consumerRecord);
                }, List$.MODULE$.canBuildFrom());
            }).toList().flatMap(tuple2 -> {
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                interpreterTypes.SourceId sourceId2 = (interpreterTypes.SourceId) tuple2._1();
                return (List) ((List) tuple2._2()).map(context -> {
                    return new Tuple2(sourceId2, context);
                }, List$.MODULE$.canBuildFrom());
            }, List$.MODULE$.canBuildFrom());
        }, List$.MODULE$.canBuildFrom());
    }

    private Future<?> sendOutputToKafka(WriterT<Object, List<EspExceptionInfo<? extends Throwable>>, List<interpreterTypes.EndResult<ProducerRecord<byte[], byte[]>>>> writerT) {
        return (Future) implicits$.MODULE$.toTraverseOps(((List) ((List) ((List) writerT.value(package$.MODULE$.catsInstancesForId())).map(endResult -> {
            return (ProducerRecord) endResult.result();
        }, List$.MODULE$.canBuildFrom())).$plus$plus((List) ((List) writerT.written(package$.MODULE$.catsInstancesForId())).map(espExceptionInfo -> {
            return this.serializeError(espExceptionInfo);
        }, List$.MODULE$.canBuildFrom()), List$.MODULE$.canBuildFrom())).map(producerRecord -> {
            return KafkaUtils$.MODULE$.sendToKafka(producerRecord, this.producer());
        }, List$.MODULE$.canBuildFrom()), UnorderedFoldable$.MODULE$.catsTraverseForList()).sequence(Predef$.MODULE$.$conforms(), Invariant$.MODULE$.catsInstancesForFuture(this.ec));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ProducerRecord<byte[], byte[]> serializeError(EspExceptionInfo<? extends Throwable> espExceptionInfo) {
        return new KafkaJsonExceptionSerializationSchema(this.metaData, this.engineConfig.exceptionHandlingConfig()).serialize(WithExceptionExtractor$.MODULE$.extractOrThrow(espExceptionInfo));
    }

    private Map<TopicPartition, OffsetAndMetadata> retrieveMaxOffsetsOffsets(ConsumerRecords<byte[], byte[]> consumerRecords) {
        return ((Iterator) CollectionConverters$.MODULE$.asScalaIteratorConverter(consumerRecords.iterator()).asScala()).map(consumerRecord -> {
            return new Tuple2(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), BoxesRunTime.boxToLong(consumerRecord.offset()));
        }).toList().groupBy(tuple2 -> {
            return (TopicPartition) tuple2._1();
        }).mapValues(list -> {
            return BoxesRunTime.boxToLong($anonfun$retrieveMaxOffsetsOffsets$3(list));
        }).mapValues(obj -> {
            return $anonfun$retrieveMaxOffsetsOffsets$5(BoxesRunTime.unboxToLong(obj));
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        ((List) new $colon.colon(producer(), new $colon.colon(consumer(), Nil$.MODULE$)).filter(closeable -> {
            return BoxesRunTime.boxToBoolean($anonfun$close$1(closeable));
        })).foreach(closeable2 -> {
            $anonfun$close$2(this, closeable2);
            return BoxedUnit.UNIT;
        });
        if (!logger().underlying().isInfoEnabled()) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            logger().underlying().info("Closed runner for {}", new Object[]{this.metaData.id()});
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    private void configSanityCheck() {
        if (Predef$.MODULE$.Integer2int(new ConsumerConfig(KafkaUtils$.MODULE$.toPropertiesForConsumer(this.engineConfig.kafka(), None$.MODULE$)).getInt("max.poll.interval.ms")) <= this.engineConfig.interpreterTimeout().$plus(this.engineConfig.publishTimeout()).toMillis()) {
            throw new IllegalArgumentException(new StringBuilder(70).append("publishTimeout + interpreterTimeout cannot exceed ").append("max.poll.interval.ms").toString());
        }
    }

    private void retryCloseOnInterrupt(Function0<BoxedUnit> function0) {
        try {
            function0.apply$mcV$sp();
        } catch (Throwable th) {
            if (!(th instanceof InterruptedException ? true : th instanceof InterruptException)) {
                throw th;
            }
            boolean interrupted = Thread.interrupted();
            if (logger().underlying().isDebugEnabled()) {
                logger().underlying().debug("Interrupted during close: {}, trying once more", new Object[]{BoxesRunTime.boxToBoolean(interrupted)});
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            function0.apply$mcV$sp();
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$prepareRecords$3(KafkaSingleScenarioTaskRun kafkaSingleScenarioTaskRun, interpreterTypes.SourceId sourceId, ConsumerRecord consumerRecord) {
        kafkaSingleScenarioTaskRun.sourceMetrics.markElement(sourceId, consumerRecord.timestamp());
    }

    public static final /* synthetic */ void $anonfun$prepareRecords$2(KafkaSingleScenarioTaskRun kafkaSingleScenarioTaskRun, List list, interpreterTypes.SourceId sourceId) {
        list.foreach(consumerRecord -> {
            $anonfun$prepareRecords$3(kafkaSingleScenarioTaskRun, sourceId, consumerRecord);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ long $anonfun$retrieveMaxOffsetsOffsets$3(List list) {
        return BoxesRunTime.unboxToLong(((TraversableOnce) list.map(tuple2 -> {
            return BoxesRunTime.boxToLong(tuple2._2$mcJ$sp());
        }, List$.MODULE$.canBuildFrom())).max(Ordering$Long$.MODULE$));
    }

    public static final /* synthetic */ OffsetAndMetadata $anonfun$retrieveMaxOffsetsOffsets$5(long j) {
        return new OffsetAndMetadata(j);
    }

    public static final /* synthetic */ boolean $anonfun$close$1(Closeable closeable) {
        return closeable != null;
    }

    public static final /* synthetic */ void $anonfun$close$2(KafkaSingleScenarioTaskRun kafkaSingleScenarioTaskRun, Closeable closeable) {
        kafkaSingleScenarioTaskRun.retryCloseOnInterrupt(() -> {
            closeable.close();
        });
    }

    public KafkaSingleScenarioTaskRun(String str, MetaData metaData, EngineRuntimeContext engineRuntimeContext, KafkaTransactionalScenarioInterpreter.EngineConfig engineConfig, interpreterTypes.ScenarioInterpreter<Future, ProducerRecord<byte[], byte[]>> scenarioInterpreter, SourceMetrics sourceMetrics, ExecutionContext executionContext) {
        this.taskId = str;
        this.metaData = metaData;
        this.runtimeContext = engineRuntimeContext;
        this.engineConfig = engineConfig;
        this.interpreter = scenarioInterpreter;
        this.sourceMetrics = sourceMetrics;
        this.ec = executionContext;
        LazyLogging.$init$(this);
        this.groupId = metaData.id();
        this.sourceToTopic = ((TraversableLike) scenarioInterpreter.sources().flatMap(tuple2 -> {
            if (tuple2 != null) {
                interpreterTypes.SourceId sourceId = (interpreterTypes.SourceId) tuple2._1();
                CommonKafkaSource commonKafkaSource = (Source) tuple2._2();
                if (commonKafkaSource instanceof CommonKafkaSource) {
                    CommonKafkaSource commonKafkaSource2 = commonKafkaSource;
                    return (List) commonKafkaSource2.topics().map(str2 -> {
                        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str2), new Tuple2(sourceId, commonKafkaSource2));
                    }, List$.MODULE$.canBuildFrom());
                }
            }
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            interpreterTypes.SourceId sourceId2 = (interpreterTypes.SourceId) tuple2._1();
            throw new IllegalArgumentException(new StringBuilder(24).append("Unexpected source: ").append((Source) tuple2._2()).append(" for ").append(sourceId2.value()).toString());
        }, Map$.MODULE$.canBuildFrom())).groupBy(tuple22 -> {
            return (String) tuple22._1();
        }).mapValues(map -> {
            return map.values().toMap(Predef$.MODULE$.$conforms());
        });
    }
}
