package io.streamthoughts.kafka.clients.consumer;

import ch.qos.logback.classic.Level;
import io.streamthoughts.kafka.clients.LoggerUtilsKt;
import io.streamthoughts.kafka.clients.consumer.ConsumerTask;
import io.streamthoughts.kafka.clients.consumer.error.ConsumedErrorHandler;
import io.streamthoughts.kafka.clients.consumer.error.serialization.DeserializationErrorHandler;
import io.streamthoughts.kafka.clients.consumer.listener.ConsumerBatchRecordsListener;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.collections.MapsKt;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.text.StringsKt;
import kotlinx.coroutines.YieldKt;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;

/* compiled from: KafkaConsumerTask.kt */
@Metadata(mv = {1, 1, 16}, bv = {1, 0, 3}, k = 1, d1 = {"��Þ\u0001\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010!\n\u0002\u0018\u0002\n��\n\u0002\u0010%\n\u0002\u0010\t\n��\n\u0002\u0018\u0002\n\u0002\u0010\u0012\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u000b\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0003\n\u0002\u0010$\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\b\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0007\u0018�� ]*\u0004\b��\u0010\u0001*\u0004\b\u0001\u0010\u00022\u00020\u0003:\u0001]B\u0083\u0001\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t\u0012\f\u0010\n\u001a\b\u0012\u0004\u0012\u00028��0\u000b\u0012\f\u0010\f\u001a\b\u0012\u0004\u0012\u00028\u00010\u000b\u0012\u0012\u0010\r\u001a\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\u000e\u0012\b\b\u0002\u0010\u000f\u001a\u00020\u0010\u0012\u0012\u0010\u0011\u001a\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\u0012\u0012\n\b\u0002\u0010\u0013\u001a\u0004\u0018\u00010\u0014\u0012\n\b\u0002\u0010\u0015\u001a\u0004\u0018\u00010\u0016¢\u0006\u0002\u0010\u0017J\b\u0010.\u001a\u00020/H\u0016J\u0010\u0010.\u001a\u00020/2\u0006\u00100\u001a\u00020)H\u0016J\u001e\u00101\u001a\u00020/2\u0014\u00102\u001a\u0010\u0012\u0004\u0012\u00020\u001a\u0012\u0004\u0012\u000204\u0018\u000103H\u0016J\u001e\u00105\u001a\u00020/2\u0014\u00102\u001a\u0010\u0012\u0004\u0012\u00020\u001a\u0012\u0004\u0012\u000204\u0018\u000103H\u0016J8\u00106\u001a\u0018\u0012\u0014\u0012\u0012\u0012\u0006\u0012\u0004\u0018\u00018��\u0012\u0006\u0012\u0004\u0018\u00018\u000108072\u0018\u00109\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020 \u0012\u0004\u0012\u00020 0807H\u0002JB\u0010:\u001a\u0002H;\"\u0004\b\u0002\u0010;2-\u0010<\u001a)\u0012\u001f\u0012\u001d\u0012\u0004\u0012\u00020 \u0012\u0004\u0012\u00020 0\u001f¢\u0006\f\b>\u0012\b\b?\u0012\u0004\b\b(@\u0012\u0004\u0012\u0002H;0=H\u0016¢\u0006\u0002\u0010AJ\u0011\u0010B\u001a\u00020/H\u0082@ø\u0001��¢\u0006\u0002\u0010CJ\b\u0010D\u001a\u00020%H\u0002J$\u0010E\u001a\u00020/2\u0006\u0010F\u001a\u00020G2\u0006\u0010H\u001a\u00020\u00102\n\b\u0002\u0010I\u001a\u0004\u0018\u00010JH\u0002J\b\u0010K\u001a\u00020/H\u0002J\b\u0010L\u001a\u00020/H\u0002J>\u0010M\u001a\u00020/2(\u0010N\u001a$\u0012\u0004\u0012\u00020\u001a\u0012\u001a\u0012\u0018\u0012\u0014\u0012\u0012\u0012\u0006\u0012\u0004\u0018\u00018��\u0012\u0006\u0012\u0004\u0018\u00018\u00010807032\n\u0010O\u001a\u00060Jj\u0002`PH\u0002J\u0014\u0010Q\u001a\u000e\u0012\u0004\u0012\u00020\u001a\u0012\u0004\u0012\u00020403H\u0002J\b\u0010R\u001a\u00020/H\u0016J\b\u0010S\u001a\u00020/H\u0002J \u0010T\u001a\u00020/2\u0016\u00109\u001a\u0012\u0012\u0006\u0012\u0004\u0018\u00018��\u0012\u0006\u0012\u0004\u0018\u00018\u00010UH\u0002J\b\u0010V\u001a\u00020WH\u0002J\b\u0010X\u001a\u00020/H\u0016J\u0011\u0010Y\u001a\u00020/H\u0096@ø\u0001��¢\u0006\u0002\u0010CJ\b\u0010,\u001a\u00020-H\u0016J\b\u0010Z\u001a\u00020/H\u0002J\b\u0010[\u001a\u00020\u0010H\u0016J\u0018\u0010\\\u001a\u00020/2\u000e\u00109\u001a\n\u0012\u0002\b\u0003\u0012\u0002\b\u00030UH\u0002R\u0014\u0010\u0018\u001a\b\u0012\u0004\u0012\u00020\u001a0\u0019X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u000f\u001a\u00020\u0010X\u0082\u000e¢\u0006\u0002\n��R\u0010\u0010\u0013\u001a\u0004\u0018\u00010\u0014X\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\u001b\u001a\u000e\u0012\u0004\u0012\u00020\u001a\u0012\u0004\u0012\u00020\u001d0\u001cX\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\u001e\u001a\u000e\u0012\u0004\u0012\u00020 \u0012\u0004\u0012\u00020 0\u001fX\u0082\u0004¢\u0006\u0002\n��R\u0010\u0010\u0015\u001a\u0004\u0018\u00010\u0016X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010!\u001a\u00020\"X\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\u0011\u001a\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\u0012X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010#\u001a\u00020\u0010X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010$\u001a\u00020%X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010&\u001a\u00020'X\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\n\u001a\b\u0012\u0004\u0012\u00028��0\u000bX\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\r\u001a\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\u000eX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010(\u001a\u00020)X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010*\u001a\u00020+X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010,\u001a\u00020-X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\f\u001a\b\u0012\u0004\u0012\u00028\u00010\u000bX\u0082\u0004¢\u0006\u0002\n��\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006^"}, d2 = {"Lio/streamthoughts/kafka/clients/consumer/KafkaConsumerTask;", "K", "V", "Lio/streamthoughts/kafka/clients/consumer/ConsumerTask;", "consumerFactory", "Lio/streamthoughts/kafka/clients/consumer/ConsumerFactory;", "consumerConfigs", "Lio/streamthoughts/kafka/clients/consumer/KafkaConsumerConfigs;", "subscription", "Lio/streamthoughts/kafka/clients/consumer/TopicSubscription;", "keyDeserializer", "Lorg/apache/kafka/common/serialization/Deserializer;", "valueDeserializer", "listener", "Lio/streamthoughts/kafka/clients/consumer/listener/ConsumerBatchRecordsListener;", "clientId", "", "deserializationErrorHandler", "Lio/streamthoughts/kafka/clients/consumer/error/serialization/DeserializationErrorHandler;", "consumedErrorHandler", "Lio/streamthoughts/kafka/clients/consumer/error/ConsumedErrorHandler;", "consumerAwareRebalanceListener", "Lio/streamthoughts/kafka/clients/consumer/ConsumerAwareRebalanceListener;", "(Lio/streamthoughts/kafka/clients/consumer/ConsumerFactory;Lio/streamthoughts/kafka/clients/consumer/KafkaConsumerConfigs;Lio/streamthoughts/kafka/clients/consumer/TopicSubscription;Lorg/apache/kafka/common/serialization/Deserializer;Lorg/apache/kafka/common/serialization/Deserializer;Lio/streamthoughts/kafka/clients/consumer/listener/ConsumerBatchRecordsListener;Ljava/lang/String;Lio/streamthoughts/kafka/clients/consumer/error/serialization/DeserializationErrorHandler;Lio/streamthoughts/kafka/clients/consumer/error/ConsumedErrorHandler;Lio/streamthoughts/kafka/clients/consumer/ConsumerAwareRebalanceListener;)V", "assignedPartitions", "", "Lorg/apache/kafka/common/TopicPartition;", "consumedOffsets", "", "", "consumer", "Lorg/apache/kafka/clients/consumer/Consumer;", "", "consumerConfig", "Lorg/apache/kafka/clients/consumer/ConsumerConfig;", "groupId", "isAutoCommitEnabled", "", "isShutdown", "Ljava/util/concurrent/atomic/AtomicBoolean;", "pollTime", "Ljava/time/Duration;", "shutdownLatch", "Ljava/util/concurrent/CountDownLatch;", "state", "Lio/streamthoughts/kafka/clients/consumer/ConsumerTask$State;", "close", "", "timeout", "commitAsync", "offsets", "", "Lorg/apache/kafka/clients/consumer/OffsetAndMetadata;", "commitSync", "deserialize", "", "Lorg/apache/kafka/clients/consumer/ConsumerRecord;", "records", "execute", "T", "action", "Lkotlin/Function1;", "Lkotlin/ParameterName;", "name", "client", "(Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;", "isCancelled", "(Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "isStillRunning", "logWithConsumerInfo", "level", "Lch/qos/logback/classic/Level;", "msg", "exception", "Ljava/lang/Exception;", "mayCommitAfterBatch", "mayCommitOnAssignment", "mayHandleConsumedError", "recordsPerPartitions", "thrownException", "Lkotlin/Exception;", "offsetAndMetadataToCommit", "pause", "pollOnce", "processBatchRecords", "Lorg/apache/kafka/clients/consumer/ConsumerRecords;", "rebalanceListener", "Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;", "resume", "run", "subscribeConsumer", "toString", "updateConsumedOffsets", "Companion", "kafka-clients-kotlin"})
/* loaded from: input_file:io/streamthoughts/kafka/clients/consumer/KafkaConsumerTask.class */
public final class KafkaConsumerTask<K, V> implements ConsumerTask {
    private volatile ConsumerTask.State state;
    private final Consumer<byte[], byte[]> consumer;
    private final ConsumerConfig consumerConfig;
    private final String groupId;
    private final Duration pollTime;
    private final boolean isAutoCommitEnabled;
    private final AtomicBoolean isShutdown;
    private final CountDownLatch shutdownLatch;
    private List<TopicPartition> assignedPartitions;
    private final Map<TopicPartition, Long> consumedOffsets;
    private final TopicSubscription subscription;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private final ConsumerBatchRecordsListener<K, V> listener;
    private String clientId;
    private final DeserializationErrorHandler<K, V> deserializationErrorHandler;
    private final ConsumedErrorHandler consumedErrorHandler;
    private final ConsumerAwareRebalanceListener consumerAwareRebalanceListener;
    public static final Companion Companion = new Companion(null);
    private static final Logger Log = LoggerUtilsKt.loggerFor(KafkaConsumerTask.class);

    /* compiled from: KafkaConsumerTask.kt */
    @Metadata(mv = {1, 1, 16}, bv = {1, 0, 3}, k = 1, d1 = {"��(\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010$\n\u0002\u0018\u0002\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002JT\u0010\u0005\u001a\u0018\u0012\u0014\u0012\u0012\u0012\u0006\u0012\u0004\u0018\u0001H\b\u0012\u0006\u0012\u0004\u0018\u0001H\t0\u00070\u0006\"\u0004\b\u0002\u0010\b\"\u0004\b\u0003\u0010\t2(\u0010\n\u001a$\u0012\u0004\u0012\u00020\f\u0012\u001a\u0012\u0018\u0012\u0014\u0012\u0012\u0012\u0006\u0012\u0004\u0018\u0001H\b\u0012\u0006\u0012\u0004\u0018\u0001H\t0\u00070\u00060\u000bH\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082\u0004¢\u0006\u0002\n��¨\u0006\r"}, d2 = {"Lio/streamthoughts/kafka/clients/consumer/KafkaConsumerTask$Companion;", "", "()V", "Log", "Lorg/slf4j/Logger;", "flatten", "", "Lorg/apache/kafka/clients/consumer/ConsumerRecord;", "K", "V", "records", "", "Lorg/apache/kafka/common/TopicPartition;", "kafka-clients-kotlin"})
    /* loaded from: input_file:io/streamthoughts/kafka/clients/consumer/KafkaConsumerTask$Companion.class */
    public static final class Companion {
        /* JADX INFO: Access modifiers changed from: private */
        public final <K, V> List<ConsumerRecord<K, V>> flatten(Map<TopicPartition, ? extends List<? extends ConsumerRecord<K, V>>> map) {
            ArrayList arrayList = new ArrayList();
            Iterator<Map.Entry<TopicPartition, ? extends List<? extends ConsumerRecord<K, V>>>> it = map.entrySet().iterator();
            while (it.hasNext()) {
                CollectionsKt.addAll(arrayList, it.next().getValue());
            }
            return CollectionsKt.toList(arrayList);
        }

        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    @Override // io.streamthoughts.kafka.clients.consumer.ConsumerTask
    public void pause() {
        Set assignment = this.consumer.assignment();
        Level level = Level.INFO;
        Intrinsics.checkExpressionValueIsNotNull(level, "Level.INFO");
        logWithConsumerInfo$default(this, level, "Pausing consumption for: " + assignment, null, 4, null);
        this.state = ConsumerTask.State.PAUSED;
        this.consumer.pause(assignment);
    }

    @Override // io.streamthoughts.kafka.clients.consumer.ConsumerTask
    public void resume() {
        Set assignment = this.consumer.assignment();
        Level level = Level.INFO;
        Intrinsics.checkExpressionValueIsNotNull(level, "Level.INFO");
        logWithConsumerInfo$default(this, level, "Resuming consumption for: " + assignment, null, 4, null);
        this.state = ConsumerTask.State.RUNNING;
        this.consumer.resume(assignment);
    }

    @Override // io.streamthoughts.kafka.clients.consumer.ConsumerTask
    @NotNull
    public ConsumerTask.State state() {
        return this.state;
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:8:0x0042. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:14:0x0081 A[Catch: WakeupException -> 0x00e6, CancellationException -> 0x013b, all -> 0x0153, TRY_LEAVE, TryCatch #1 {CancellationException -> 0x013b, blocks: (B:12:0x007a, B:14:0x0081, B:17:0x00ab, B:40:0x00a5), top: B:39:0x00a5, outer: #0 }] */
    /* JADX WARN: Removed duplicated region for block: B:38:0x009c  */
    /* JADX WARN: Removed duplicated region for block: B:42:0x018a  */
    /* JADX WARN: Removed duplicated region for block: B:9:0x0058  */
    @Override // io.streamthoughts.kafka.clients.consumer.ConsumerTask
    @org.jetbrains.annotations.Nullable
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public java.lang.Object run(@org.jetbrains.annotations.NotNull kotlin.coroutines.Continuation<? super kotlin.Unit> r8) {
        /*
            Method dump skipped, instructions count: 404
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.streamthoughts.kafka.clients.consumer.KafkaConsumerTask.run(kotlin.coroutines.Continuation):java.lang.Object");
    }

    private final boolean isStillRunning() {
        return !this.isShutdown.get();
    }

    @Nullable
    final /* synthetic */ Object isCancelled(@NotNull Continuation<? super Unit> continuation) throws CancellationException {
        Object yield = YieldKt.yield(continuation);
        return yield == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? yield : Unit.INSTANCE;
    }

    private final void subscribeConsumer() {
        this.subscription.subscribe(this.consumer, rebalanceListener());
    }

    private final void pollOnce() {
        ConsumerRecords<?, ?> poll = this.consumer.poll(this.pollTime);
        Intrinsics.checkExpressionValueIsNotNull(poll, "consumer.poll(pollTime)");
        if (this.state == ConsumerTask.State.PARTITIONS_ASSIGNED) {
            this.state = ConsumerTask.State.RUNNING;
        }
        if (poll.isEmpty()) {
            return;
        }
        Set partitions = poll.partitions();
        Intrinsics.checkExpressionValueIsNotNull(partitions, "rawRecords.partitions()");
        Set<TopicPartition> set = partitions;
        ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(set, 10));
        for (TopicPartition topicPartition : set) {
            List<? extends ConsumerRecord<byte[], byte[]>> records = poll.records(topicPartition);
            Intrinsics.checkExpressionValueIsNotNull(records, "rawRecords.records(it)");
            arrayList.add(new Pair(topicPartition, deserialize(records)));
        }
        Map<TopicPartition, ? extends List<? extends ConsumerRecord<K, V>>> map = MapsKt.toMap(arrayList);
        try {
            processBatchRecords(new ConsumerRecords<>(map));
            updateConsumedOffsets(poll);
            mayCommitAfterBatch();
        } catch (Exception e) {
            mayHandleConsumedError(map, e);
        }
    }

    private final void mayHandleConsumedError(Map<TopicPartition, ? extends List<? extends ConsumerRecord<K, V>>> map, Exception exc) {
        ConsumedErrorHandler consumedErrorHandler = this.consumedErrorHandler;
        if (consumedErrorHandler != null) {
            consumedErrorHandler.handle(this, Companion.flatten(map), exc);
        }
    }

    private final void processBatchRecords(ConsumerRecords<K, V> consumerRecords) {
        this.listener.handle(this, consumerRecords);
    }

    private final void updateConsumedOffsets(ConsumerRecords<?, ?> consumerRecords) {
        Set<TopicPartition> partitions = consumerRecords.partitions();
        Intrinsics.checkExpressionValueIsNotNull(partitions, "records.partitions()");
        for (TopicPartition topicPartition : partitions) {
            KafkaConsumerTask<K, V> kafkaConsumerTask = this;
            Long l = kafkaConsumerTask.consumedOffsets.get(topicPartition);
            long longValue = l != null ? l.longValue() : 0L;
            List records = consumerRecords.records(topicPartition);
            Intrinsics.checkExpressionValueIsNotNull(records, "records.records(topicPartition)");
            Iterator it = records.iterator();
            while (it.hasNext()) {
                longValue = Math.max(longValue, ((ConsumerRecord) it.next()).offset());
            }
            Map<TopicPartition, Long> map = kafkaConsumerTask.consumedOffsets;
            Intrinsics.checkExpressionValueIsNotNull(topicPartition, "topicPartition");
            map.put(topicPartition, Long.valueOf(longValue));
        }
    }

    @Override // io.streamthoughts.kafka.clients.consumer.ConsumerTask, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        Level level = Level.INFO;
        Intrinsics.checkExpressionValueIsNotNull(level, "Level.INFO");
        logWithConsumerInfo$default(this, level, "Closing", null, 4, null);
        this.isShutdown.set(true);
        this.consumer.wakeup();
        this.shutdownLatch.await();
    }

    /*  JADX ERROR: JadxRuntimeException in pass: BlockSplitter
        jadx.core.utils.exceptions.JadxRuntimeException: Unexpected missing predecessor for block: B:4:0x0037
        	at jadx.core.dex.visitors.blocks.BlockSplitter.addTempConnectionsForExcHandlers(BlockSplitter.java:275)
        	at jadx.core.dex.visitors.blocks.BlockSplitter.visit(BlockSplitter.java:68)
        */
    @Override // io.streamthoughts.kafka.clients.consumer.ConsumerTask
    public void close(@org.jetbrains.annotations.NotNull java.time.Duration r8) {
        /*
            r7 = this;
            r0 = r8
            java.lang.String r1 = "timeout"
            kotlin.jvm.internal.Intrinsics.checkParameterIsNotNull(r0, r1)
            r0 = r7
            ch.qos.logback.classic.Level r1 = ch.qos.logback.classic.Level.INFO
            r2 = r1
            java.lang.String r3 = "Level.INFO"
            kotlin.jvm.internal.Intrinsics.checkExpressionValueIsNotNull(r2, r3)
            java.lang.String r2 = "Closing"
            r3 = 0
            r4 = 4
            r5 = 0
            logWithConsumerInfo$default(r0, r1, r2, r3, r4, r5)
            r0 = r7
            java.util.concurrent.atomic.AtomicBoolean r0 = r0.isShutdown
            r1 = 1
            r0.set(r1)
            r0 = r7
            org.apache.kafka.clients.consumer.Consumer<byte[], byte[]> r0 = r0.consumer
            r0.wakeup()
            r0 = r8
            java.time.Duration r1 = java.time.Duration.ZERO
            boolean r0 = kotlin.jvm.internal.Intrinsics.areEqual(r0, r1)
            r1 = 1
            r0 = r0 ^ r1
            if (r0 == 0) goto L5f
        L38:
            r0 = r7
            java.util.concurrent.CountDownLatch r0 = r0.shutdownLatch     // Catch: java.lang.InterruptedException -> L4a
            r1 = r8
            long r1 = r1.toMillis()     // Catch: java.lang.InterruptedException -> L4a
            java.util.concurrent.TimeUnit r2 = java.util.concurrent.TimeUnit.MILLISECONDS     // Catch: java.lang.InterruptedException -> L4a
            boolean r0 = r0.await(r1, r2)     // Catch: java.lang.InterruptedException -> L4a
            goto L5f
        L4a:
            r9 = move-exception
            r0 = r7
            ch.qos.logback.classic.Level r1 = ch.qos.logback.classic.Level.WARN
            r2 = r1
            java.lang.String r3 = "Level.WARN"
            kotlin.jvm.internal.Intrinsics.checkExpressionValueIsNotNull(r2, r3)
            java.lang.String r2 = "Failed to close consumer before timeout"
            r3 = 0
            r4 = 4
            r5 = 0
            logWithConsumerInfo$default(r0, r1, r2, r3, r4, r5)
        L5f:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: io.streamthoughts.kafka.clients.consumer.KafkaConsumerTask.close(java.time.Duration):void");
    }

    private final List<ConsumerRecord<K, V>> deserialize(List<? extends ConsumerRecord<byte[], byte[]>> list) {
        Pair pair;
        Pair pair2;
        LinkedList linkedList = new LinkedList();
        for (ConsumerRecord<byte[], byte[]> consumerRecord : list) {
            String str = consumerRecord.topic();
            try {
                pair2 = new Pair(this.keyDeserializer.deserialize(str, consumerRecord.headers(), (byte[]) consumerRecord.key()), this.valueDeserializer.deserialize(str, consumerRecord.headers(), (byte[]) consumerRecord.value()));
            } catch (Exception e) {
                DeserializationErrorHandler.Response<K, V> handle = this.deserializationErrorHandler.handle(consumerRecord, e);
                if (handle instanceof DeserializationErrorHandler.Response.Replace) {
                    pair = new Pair(((DeserializationErrorHandler.Response.Replace) handle).getKey(), ((DeserializationErrorHandler.Response.Replace) handle).getValue());
                } else {
                    if (handle instanceof DeserializationErrorHandler.Response.Fail) {
                        throw e;
                    }
                    if (!(handle instanceof DeserializationErrorHandler.Response.Skip)) {
                        throw e;
                    }
                    pair = null;
                }
                pair2 = pair;
            }
            Pair pair3 = pair2;
            if (pair3 != null) {
                linkedList.add(new ConsumerRecord(str, consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp(), consumerRecord.timestampType(), Long.valueOf(consumerRecord.checksum()), consumerRecord.serializedKeySize(), consumerRecord.serializedValueSize(), pair3.getFirst(), pair3.getSecond(), consumerRecord.headers()));
            }
        }
        return linkedList;
    }

    private final ConsumerRebalanceListener rebalanceListener() {
        return new ConsumerRebalanceListener() { // from class: io.streamthoughts.kafka.clients.consumer.KafkaConsumerTask$rebalanceListener$1
            public void onPartitionsAssigned(@NotNull Collection<TopicPartition> collection) {
                List list;
                ConsumerAwareRebalanceListener consumerAwareRebalanceListener;
                Consumer<?, ?> consumer;
                Intrinsics.checkParameterIsNotNull(collection, "partitions");
                KafkaConsumerTask kafkaConsumerTask = KafkaConsumerTask.this;
                Level level = Level.INFO;
                Intrinsics.checkExpressionValueIsNotNull(level, "Level.INFO");
                KafkaConsumerTask.logWithConsumerInfo$default(kafkaConsumerTask, level, "Partitions Assigned: " + collection, null, 4, null);
                KafkaConsumerTask.this.state = ConsumerTask.State.PARTITIONS_ASSIGNED;
                list = KafkaConsumerTask.this.assignedPartitions;
                list.addAll(collection);
                consumerAwareRebalanceListener = KafkaConsumerTask.this.consumerAwareRebalanceListener;
                if (consumerAwareRebalanceListener != null) {
                    consumer = KafkaConsumerTask.this.consumer;
                    consumerAwareRebalanceListener.onPartitionsAssigned(consumer, collection);
                }
                if (collection.isEmpty()) {
                    return;
                }
                KafkaConsumerTask.this.mayCommitOnAssignment();
            }

            public void onPartitionsRevoked(@NotNull Collection<TopicPartition> collection) {
                ConsumerAwareRebalanceListener consumerAwareRebalanceListener;
                Map<TopicPartition, ? extends OffsetAndMetadata> offsetAndMetadataToCommit;
                ConsumerAwareRebalanceListener consumerAwareRebalanceListener2;
                List list;
                Consumer<?, ?> consumer;
                Consumer<?, ?> consumer2;
                Intrinsics.checkParameterIsNotNull(collection, "partitions");
                KafkaConsumerTask kafkaConsumerTask = KafkaConsumerTask.this;
                Level level = Level.INFO;
                Intrinsics.checkExpressionValueIsNotNull(level, "Level.INFO");
                KafkaConsumerTask.logWithConsumerInfo$default(kafkaConsumerTask, level, "Partitions Revoked: " + collection, null, 4, null);
                KafkaConsumerTask.this.state = ConsumerTask.State.PARTITIONS_REVOKED;
                consumerAwareRebalanceListener = KafkaConsumerTask.this.consumerAwareRebalanceListener;
                if (consumerAwareRebalanceListener != null) {
                    consumer2 = KafkaConsumerTask.this.consumer;
                    consumerAwareRebalanceListener.onPartitionsRevokedBeforeCommit(consumer2, collection);
                }
                KafkaConsumerTask kafkaConsumerTask2 = KafkaConsumerTask.this;
                offsetAndMetadataToCommit = KafkaConsumerTask.this.offsetAndMetadataToCommit();
                kafkaConsumerTask2.commitSync(offsetAndMetadataToCommit);
                consumerAwareRebalanceListener2 = KafkaConsumerTask.this.consumerAwareRebalanceListener;
                if (consumerAwareRebalanceListener2 != null) {
                    consumer = KafkaConsumerTask.this.consumer;
                    consumerAwareRebalanceListener2.onPartitionsRevokedAfterCommit(consumer, collection);
                }
                list = KafkaConsumerTask.this.assignedPartitions;
                list.clear();
            }

            public void onPartitionsLost(@NotNull Collection<TopicPartition> collection) {
                ConsumerAwareRebalanceListener consumerAwareRebalanceListener;
                List list;
                Consumer<?, ?> consumer;
                Intrinsics.checkParameterIsNotNull(collection, "partitions");
                KafkaConsumerTask kafkaConsumerTask = KafkaConsumerTask.this;
                Level level = Level.INFO;
                Intrinsics.checkExpressionValueIsNotNull(level, "Level.INFO");
                KafkaConsumerTask.logWithConsumerInfo$default(kafkaConsumerTask, level, "Partitions Lost: " + collection, null, 4, null);
                consumerAwareRebalanceListener = KafkaConsumerTask.this.consumerAwareRebalanceListener;
                if (consumerAwareRebalanceListener != null) {
                    consumer = KafkaConsumerTask.this.consumer;
                    consumerAwareRebalanceListener.onPartitionsLost(consumer, collection);
                }
                list = KafkaConsumerTask.this.assignedPartitions;
                list.clear();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataToCommit() {
        Map<TopicPartition, Long> map = this.consumedOffsets;
        ArrayList arrayList = new ArrayList(map.size());
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            arrayList.add(new Pair(entry.getKey(), new OffsetAndMetadata(entry.getValue().longValue() + 1)));
        }
        return MapsKt.toMap(arrayList);
    }

    private final void mayCommitAfterBatch() {
        if (this.isAutoCommitEnabled) {
            return;
        }
        if (!this.consumedOffsets.isEmpty()) {
            commitAsync(offsetAndMetadataToCommit());
            this.consumedOffsets.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void mayCommitOnAssignment() {
        if (Intrinsics.areEqual(this.consumerConfig.getString("auto.offset.reset"), AutoOffsetReset.Latest)) {
            List<TopicPartition> list = this.assignedPartitions;
            ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(list, 10));
            for (TopicPartition topicPartition : list) {
                arrayList.add(new Pair(topicPartition, new OffsetAndMetadata(this.consumer.position(topicPartition))));
            }
            commitSync(MapsKt.toMap(arrayList));
        }
    }

    @Override // io.streamthoughts.kafka.clients.consumer.ConsumerTask
    public void commitAsync(@Nullable Map<TopicPartition, ? extends OffsetAndMetadata> map) {
        Level level = Level.INFO;
        Intrinsics.checkExpressionValueIsNotNull(level, "Level.INFO");
        logWithConsumerInfo$default(this, level, "Committing offsets async-synchronously for positions: " + map, null, 4, null);
        this.consumer.commitAsync(map, new OffsetCommitCallback() { // from class: io.streamthoughts.kafka.clients.consumer.KafkaConsumerTask$commitAsync$1
            public final void onComplete(Map<TopicPartition, OffsetAndMetadata> map2, Exception exc) {
                if (exc != null) {
                    KafkaConsumerTask kafkaConsumerTask = KafkaConsumerTask.this;
                    Level level2 = Level.WARN;
                    Intrinsics.checkExpressionValueIsNotNull(level2, "Level.WARN");
                    kafkaConsumerTask.logWithConsumerInfo(level2, "Fail to commit position async-synchronously", exc);
                }
            }
        });
    }

    @Override // io.streamthoughts.kafka.clients.consumer.ConsumerTask
    public void commitSync(@Nullable Map<TopicPartition, ? extends OffsetAndMetadata> map) {
        if (this.consumer.assignment().isEmpty()) {
            return;
        }
        try {
            if (map == null) {
                Level level = Level.WARN;
                Intrinsics.checkExpressionValueIsNotNull(level, "Level.WARN");
                logWithConsumerInfo$default(this, level, "Committing offsets synchronously for consumed records", null, 4, null);
                this.consumer.commitSync();
            } else {
                Level level2 = Level.WARN;
                Intrinsics.checkExpressionValueIsNotNull(level2, "Level.WARN");
                logWithConsumerInfo$default(this, level2, "Committing offsets synchronously for positions: " + map, null, 4, null);
                this.consumer.commitSync(map);
            }
            Level level3 = Level.WARN;
            Intrinsics.checkExpressionValueIsNotNull(level3, "Level.WARN");
            logWithConsumerInfo$default(this, level3, "Offsets committed for partitions: " + this.assignedPartitions, null, 4, null);
        } catch (RebalanceInProgressException e) {
            Level level4 = Level.WARN;
            Intrinsics.checkExpressionValueIsNotNull(level4, "Level.WARN");
            logWithConsumerInfo$default(this, level4, "Error while committing offsets due to a rebalance in progress. Ignored", null, 4, null);
        } catch (RetriableCommitFailedException e2) {
            commitSync(map);
        }
    }

    @NotNull
    public String toString() {
        return "ConsumerTask(groupId='" + this.groupId + "', subscription=" + this.subscription + ", assignedPartitions=" + this.assignedPartitions + ", state=" + this.state + ')';
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void logWithConsumerInfo(Level level, String str, Exception exc) {
        String str2 = "Consumer(groupId=" + this.groupId + ", clientId=" + this.clientId + "): " + str;
        if (Intrinsics.areEqual(level, Level.ERROR)) {
            Log.error(str2, exc);
            return;
        }
        if (Intrinsics.areEqual(level, Level.WARN)) {
            Log.warn(str2);
            return;
        }
        if (Intrinsics.areEqual(level, Level.INFO)) {
            Log.info(str2);
        } else if (Intrinsics.areEqual(level, Level.DEBUG)) {
            Log.debug(str2);
        } else {
            Log.debug(str2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static /* synthetic */ void logWithConsumerInfo$default(KafkaConsumerTask kafkaConsumerTask, Level level, String str, Exception exc, int i, Object obj) {
        if ((i & 4) != 0) {
            exc = (Exception) null;
        }
        kafkaConsumerTask.logWithConsumerInfo(level, str, exc);
    }

    @Override // io.streamthoughts.kafka.clients.consumer.ConsumerTask
    public <T> T execute(@NotNull Function1<? super Consumer<byte[], byte[]>, ? extends T> function1) {
        Intrinsics.checkParameterIsNotNull(function1, "action");
        return (T) function1.invoke(this.consumer);
    }

    public KafkaConsumerTask(@NotNull ConsumerFactory consumerFactory, @NotNull KafkaConsumerConfigs kafkaConsumerConfigs, @NotNull TopicSubscription topicSubscription, @NotNull Deserializer<K> deserializer, @NotNull Deserializer<V> deserializer2, @NotNull ConsumerBatchRecordsListener<K, V> consumerBatchRecordsListener, @NotNull String str, @NotNull DeserializationErrorHandler<K, V> deserializationErrorHandler, @Nullable ConsumedErrorHandler consumedErrorHandler, @Nullable ConsumerAwareRebalanceListener consumerAwareRebalanceListener) {
        long j;
        Intrinsics.checkParameterIsNotNull(consumerFactory, "consumerFactory");
        Intrinsics.checkParameterIsNotNull(kafkaConsumerConfigs, "consumerConfigs");
        Intrinsics.checkParameterIsNotNull(topicSubscription, "subscription");
        Intrinsics.checkParameterIsNotNull(deserializer, "keyDeserializer");
        Intrinsics.checkParameterIsNotNull(deserializer2, "valueDeserializer");
        Intrinsics.checkParameterIsNotNull(consumerBatchRecordsListener, "listener");
        Intrinsics.checkParameterIsNotNull(str, "clientId");
        Intrinsics.checkParameterIsNotNull(deserializationErrorHandler, "deserializationErrorHandler");
        this.subscription = topicSubscription;
        this.keyDeserializer = deserializer;
        this.valueDeserializer = deserializer2;
        this.listener = consumerBatchRecordsListener;
        this.clientId = str;
        this.deserializationErrorHandler = deserializationErrorHandler;
        this.consumedErrorHandler = consumedErrorHandler;
        this.consumerAwareRebalanceListener = consumerAwareRebalanceListener;
        this.state = ConsumerTask.State.CREATED;
        this.isShutdown = new AtomicBoolean(false);
        this.shutdownLatch = new CountDownLatch(1);
        this.assignedPartitions = new ArrayList();
        this.consumedOffsets = new HashMap();
        final HashMap hashMap = new HashMap(kafkaConsumerConfigs);
        if (this.clientId.length() == 0) {
            this.clientId = String.valueOf(hashMap.get("client.id"));
        }
        hashMap.put("client.id", this.clientId);
        hashMap.put("key.deserializer", ByteArrayDeserializer.class);
        hashMap.put("key.deserializer", ByteArrayDeserializer.class);
        hashMap.put("value.deserializer", ByteArrayDeserializer.class);
        if (!hashMap.containsKey("enable.auto.commit")) {
            hashMap.put("enable.auto.commit", false);
        }
        this.consumer = consumerFactory.make(hashMap);
        this.consumerConfig = new ConsumerConfig(hashMap, false) { // from class: io.streamthoughts.kafka.clients.consumer.KafkaConsumerTask.1
        };
        String string = this.consumerConfig.getString("group.id");
        Intrinsics.checkExpressionValueIsNotNull(string, "consumerConfig.getString…erConfig.GROUP_ID_CONFIG)");
        this.groupId = string;
        String string2 = this.consumerConfig.getString("client.id");
        Intrinsics.checkExpressionValueIsNotNull(string2, "consumerConfig.getString…rConfig.CLIENT_ID_CONFIG)");
        this.clientId = string2;
        Boolean bool = this.consumerConfig.getBoolean("enable.auto.commit");
        Intrinsics.checkExpressionValueIsNotNull(bool, "consumerConfig.getBoolea…NABLE_AUTO_COMMIT_CONFIG)");
        this.isAutoCommitEnabled = bool.booleanValue();
        Object obj = hashMap.get(KafkaConsumerConfigs.POLL_INTERVAL_MS_CONFIG);
        if (obj != null) {
            String obj2 = obj.toString();
            if (obj2 != null) {
                Long longOrNull = StringsKt.toLongOrNull(obj2);
                if (longOrNull != null) {
                    j = longOrNull.longValue();
                    Duration ofMillis = Duration.ofMillis(j);
                    Intrinsics.checkExpressionValueIsNotNull(ofMillis, "Duration.ofMillis(\n     …RVAL_MS_DEFAULT\n        )");
                    this.pollTime = ofMillis;
                }
            }
        }
        j = KafkaConsumerConfigs.POLL_INTERVAL_MS_DEFAULT;
        Duration ofMillis2 = Duration.ofMillis(j);
        Intrinsics.checkExpressionValueIsNotNull(ofMillis2, "Duration.ofMillis(\n     …RVAL_MS_DEFAULT\n        )");
        this.pollTime = ofMillis2;
    }

    public /* synthetic */ KafkaConsumerTask(ConsumerFactory consumerFactory, KafkaConsumerConfigs kafkaConsumerConfigs, TopicSubscription topicSubscription, Deserializer deserializer, Deserializer deserializer2, ConsumerBatchRecordsListener consumerBatchRecordsListener, String str, DeserializationErrorHandler deserializationErrorHandler, ConsumedErrorHandler consumedErrorHandler, ConsumerAwareRebalanceListener consumerAwareRebalanceListener, int i, DefaultConstructorMarker defaultConstructorMarker) {
        this(consumerFactory, kafkaConsumerConfigs, topicSubscription, deserializer, deserializer2, consumerBatchRecordsListener, (i & 64) != 0 ? "" : str, deserializationErrorHandler, (i & 256) != 0 ? (ConsumedErrorHandler) null : consumedErrorHandler, (i & 512) != 0 ? (ConsumerAwareRebalanceListener) null : consumerAwareRebalanceListener);
    }
}
