package pl.touk.nussknacker.engine.lite.kafka;

import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import pl.touk.nussknacker.engine.kafka.KafkaUtils$;
import pl.touk.nussknacker.engine.util.Implicits$;
import scala.Tuple2;
import scala.collection.immutable.List;
import scala.concurrent.Future;
import scala.jdk.CollectionConverters$;
import scala.math.Ordering$Long$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaProducerRecordsHandler.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005Ub\u0001\u0002\b\u0010\tqA\u0001\"\u000b\u0001\u0003\u0006\u0004%\tB\u000b\u0005\t\u0001\u0002\u0011\t\u0011)A\u0005W!A\u0011\t\u0001B\u0001B\u0003%!\tC\u0003N\u0001\u0011%a\nC\u0003S\u0001\u0011\u00053\u000bC\u0003`\u0001\u0011\u0005\u0003\rC\u0003b\u0001\u0011\u0005#\rC\u0003\u007f\u0001\u0011\u0005\u0003\r\u0003\u0004��\u0001\u0011%\u0011\u0011A\u0004\b\u0003;y\u0001\u0012BA\u0010\r\u0019qq\u0002#\u0003\u0002\"!1Qj\u0003C\u0001\u0003SAq!a\u000b\f\t\u0003\tiCA\u0012Ue\u0006t7/Y2uS>t\u0017\r\u001c)s_\u0012,8-\u001a:SK\u000e|'\u000fZ:IC:$G.\u001a:\u000b\u0005A\t\u0012!B6bM.\f'B\u0001\n\u0014\u0003\u0011a\u0017\u000e^3\u000b\u0005Q)\u0012AB3oO&tWM\u0003\u0002\u0017/\u0005Ya.^:tW:\f7m[3s\u0015\tA\u0012$\u0001\u0003u_V\\'\"\u0001\u000e\u0002\u0005Ad7\u0001A\n\u0004\u0001u)\u0003C\u0001\u0010$\u001b\u0005y\"B\u0001\u0011\"\u0003\u0011a\u0017M\\4\u000b\u0003\t\nAA[1wC&\u0011Ae\b\u0002\u0007\u001f\nTWm\u0019;\u0011\u0005\u0019:S\"A\b\n\u0005!z!aG&bM.\f\u0007K]8ek\u000e,'OU3d_J$7\u000fS1oI2,'/\u0001\u0005qe>$WoY3s+\u0005Y\u0003\u0003\u0002\u00176o]j\u0011!\f\u0006\u0003S9R!a\f\u0019\u0002\u000f\rd\u0017.\u001a8ug*\u0011\u0001#\r\u0006\u0003eM\na!\u00199bG\",'\"\u0001\u001b\u0002\u0007=\u0014x-\u0003\u00027[\ti1*\u00194lCB\u0013x\u000eZ;dKJ\u00042\u0001O\u001e>\u001b\u0005I$\"\u0001\u001e\u0002\u000bM\u001c\u0017\r\\1\n\u0005qJ$!B!se\u0006L\bC\u0001\u001d?\u0013\ty\u0014H\u0001\u0003CsR,\u0017!\u00039s_\u0012,8-\u001a:!\u0003=\u0019wN\\:v[\u0016\u0014xI]8va&#\u0007CA\"K\u001d\t!\u0005\n\u0005\u0002Fs5\taI\u0003\u0002H7\u00051AH]8pizJ!!S\u001d\u0002\rA\u0013X\rZ3g\u0013\tYEJ\u0001\u0004TiJLgn\u001a\u0006\u0003\u0013f\na\u0001P5oSRtDcA(Q#B\u0011a\u0005\u0001\u0005\u0006S\u0011\u0001\ra\u000b\u0005\u0006\u0003\u0012\u0001\rAQ\u0001\u0019K:\u0014\u0018n\u00195D_:\u001cX/\\3s!J|\u0007/\u001a:uS\u0016\u001cHC\u0001+X!\tAT+\u0003\u0002Ws\t!QK\\5u\u0011\u0015AV\u00011\u0001Z\u0003I\u0019wN\\:v[\u0016\u0014\bK]8qKJ$\u0018.Z:\u0011\u0005ikV\"A.\u000b\u0005q\u000b\u0013\u0001B;uS2L!AX.\u0003\u0015A\u0013x\u000e]3si&,7/A\fcK\u001a|'/\u001a*fG>\u0014Hm\u001d)s_\u000e,7o]5oOR\tA+\u0001\u0010p]J+7m\u001c:egN+8mY3tg\u001a,H\u000e\\=Qe>\u001cWm]:fIR\u0019AkY6\t\u000b\u0011<\u0001\u0019A3\u0002\u000fI,7m\u001c:egB!a-[\u001c8\u001b\u00059'B\u00015/\u0003!\u0019wN\\:v[\u0016\u0014\u0018B\u00016h\u0005=\u0019uN\\:v[\u0016\u0014(+Z2pe\u0012\u001c\b\"\u00025\b\u0001\u0004a\u0007gA7syB!aM\u001c9|\u0013\tywMA\u0007LC\u001a\\\u0017mQ8ogVlWM\u001d\t\u0003cJd\u0001\u0001B\u0005tW\u0006\u0005\t\u0011!B\u0001i\n\u0019q\f\n\u001b\u0012\u0005UD\bC\u0001\u001dw\u0013\t9\u0018HA\u0004O_RD\u0017N\\4\u0011\u0005aJ\u0018B\u0001>:\u0005\r\te.\u001f\t\u0003cr$\u0011\"`6\u0002\u0002\u0003\u0005)\u0011\u0001;\u0003\u0007}#S'\u0001\u000ep]J+7m\u001c:egB\u0013xnY3tg&twMR1jYV\u0014X-A\rsKR\u0014\u0018.\u001a<f\u001b\u0006DxJ\u001a4tKR\u001cxJ\u001a4tKR\u001cH\u0003BA\u0002\u00037\u0001raQA\u0003\u0003\u0013\t)\"C\u0002\u0002\b1\u00131!T1q!\u0011\tY!!\u0005\u000e\u0005\u00055!bAA\ba\u000511m\\7n_:LA!a\u0005\u0002\u000e\tqAk\u001c9jGB\u000b'\u000f^5uS>t\u0007c\u00014\u0002\u0018%\u0019\u0011\u0011D4\u0003#=3gm]3u\u0003:$W*\u001a;bI\u0006$\u0018\rC\u0003e\u0013\u0001\u0007Q-A\u0012Ue\u0006t7/Y2uS>t\u0017\r\u001c)s_\u0012,8-\u001a:SK\u000e|'\u000fZ:IC:$G.\u001a:\u0011\u0005\u0019Z1cA\u0006\u0002$A\u0019\u0001(!\n\n\u0007\u0005\u001d\u0012H\u0001\u0004B]f\u0014VM\u001a\u000b\u0003\u0003?\tQ!\u00199qYf$RaTA\u0018\u0003gAa!!\r\u000e\u0001\u0004I\u0016A\u00059s_\u0012,8-\u001a:Qe>\u0004XM\u001d;jKNDQ!Q\u0007A\u0002\t\u0003")
/* loaded from: input_file:pl/touk/nussknacker/engine/lite/kafka/TransactionalProducerRecordsHandler.class */
public class TransactionalProducerRecordsHandler implements KafkaProducerRecordsHandler {
    private final KafkaProducer<byte[], byte[]> producer;
    private final String consumerGroupId;

    public static TransactionalProducerRecordsHandler apply(Properties properties, String str) {
        return TransactionalProducerRecordsHandler$.MODULE$.apply(properties, str);
    }

    @Override // pl.touk.nussknacker.engine.lite.kafka.KafkaProducerRecordsHandler
    public final Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> producerRecord) {
        Future<RecordMetadata> send;
        send = send(producerRecord);
        return send;
    }

    @Override // pl.touk.nussknacker.engine.lite.kafka.KafkaProducerRecordsHandler
    public final Map<MetricName, ? extends Metric> metrics() {
        Map<MetricName, ? extends Metric> metrics;
        metrics = metrics();
        return metrics;
    }

    @Override // pl.touk.nussknacker.engine.lite.kafka.KafkaProducerRecordsHandler, java.lang.AutoCloseable
    public void close() {
        close();
    }

    @Override // pl.touk.nussknacker.engine.lite.kafka.KafkaProducerRecordsHandler
    public KafkaProducer<byte[], byte[]> producer() {
        return this.producer;
    }

    @Override // pl.touk.nussknacker.engine.lite.kafka.KafkaProducerRecordsHandler
    public void enrichConsumerProperties(Properties properties) {
        KafkaUtils$.MODULE$.setIsolationLevelIfAbsent(properties, IsolationLevel.READ_COMMITTED);
        properties.put("enable.auto.commit", BoxesRunTime.boxToBoolean(false));
    }

    @Override // pl.touk.nussknacker.engine.lite.kafka.KafkaProducerRecordsHandler
    public void beforeRecordsProcessing() {
        producer().beginTransaction();
    }

    @Override // pl.touk.nussknacker.engine.lite.kafka.KafkaProducerRecordsHandler
    public void onRecordsSuccessfullyProcessed(ConsumerRecords<byte[], byte[]> consumerRecords, KafkaConsumer<?, ?> kafkaConsumer) {
        producer().sendOffsetsToTransaction(CollectionConverters$.MODULE$.MapHasAsJava(retrieveMaxOffsetsOffsets(consumerRecords)).asJava(), this.consumerGroupId);
        producer().commitTransaction();
    }

    @Override // pl.touk.nussknacker.engine.lite.kafka.KafkaProducerRecordsHandler
    public void onRecordsProcessingFailure() {
        producer().abortTransaction();
    }

    private scala.collection.immutable.Map<TopicPartition, OffsetAndMetadata> retrieveMaxOffsetsOffsets(ConsumerRecords<byte[], byte[]> consumerRecords) {
        return Implicits$.MODULE$.RichScalaMap(Implicits$.MODULE$.RichScalaMap(CollectionConverters$.MODULE$.IteratorHasAsScala(consumerRecords.iterator()).asScala().map(consumerRecord -> {
            return new Tuple2(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), BoxesRunTime.boxToLong(consumerRecord.offset() + 1));
        }).toList().groupBy(tuple2 -> {
            return (TopicPartition) tuple2._1();
        })).mapValuesNow(list -> {
            return BoxesRunTime.boxToLong($anonfun$retrieveMaxOffsetsOffsets$3(list));
        })).mapValuesNow(obj -> {
            return $anonfun$retrieveMaxOffsetsOffsets$5(BoxesRunTime.unboxToLong(obj));
        });
    }

    public static final /* synthetic */ long $anonfun$retrieveMaxOffsetsOffsets$3(List list) {
        return BoxesRunTime.unboxToLong(list.map(tuple2 -> {
            return BoxesRunTime.boxToLong(tuple2._2$mcJ$sp());
        }).max(Ordering$Long$.MODULE$));
    }

    public static final /* synthetic */ OffsetAndMetadata $anonfun$retrieveMaxOffsetsOffsets$5(long j) {
        return new OffsetAndMetadata(j);
    }

    public TransactionalProducerRecordsHandler(KafkaProducer<byte[], byte[]> kafkaProducer, String str) {
        this.producer = kafkaProducer;
        this.consumerGroupId = str;
        KafkaProducerRecordsHandler.$init$(this);
    }
}
