package io.streamthoughts.kafka.clients.consumer;

import ch.qos.logback.classic.Level;
import io.streamthoughts.kafka.clients.LoggerUtilsKt;
import io.streamthoughts.kafka.clients.consumer.ConsumerTask;
import io.streamthoughts.kafka.clients.consumer.error.serialization.DeserializationErrorHandler;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.collections.MapsKt;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.YieldKt;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;

/* compiled from: ConsumerTask.kt */
@Metadata(mv = {1, 1, 16}, bv = {1, 0, 3}, k = 1, d1 = {"��Ä\u0001\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010��\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010!\n\u0002\u0018\u0002\n��\n\u0002\u0010%\n\u0002\u0010\t\n��\n\u0002\u0010\u0012\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u000b\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010$\n\u0002\u0018\u0002\n\u0002\b\u0006\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0007\n\u0002\u0018\u0002\n\u0002\b\t\u0018�� P*\u0004\b��\u0010\u0001*\u0004\b\u0001\u0010\u00022\u00020\u0003:\u0002PQB¥\u0001\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t\u0012\f\u0010\n\u001a\b\u0012\u0004\u0012\u00028��0\u000b\u0012\f\u0010\f\u001a\b\u0012\u0004\u0012\u00028\u00010\u000b\u0012@\u0010\r\u001a<\u0012\f\u0012\n\u0012\u0002\b\u0003\u0012\u0002\b\u00030\u000f\u0012\u0014\u0012\u0012\u0012\u0006\u0012\u0004\u0018\u00018��\u0012\u0006\u0012\u0004\u0018\u00018\u00010\u0010\u0012\u0004\u0012\u00020\u00110\u000ej\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u0001`\u0012\u0012\b\b\u0002\u0010\u0013\u001a\u00020\u0014\u0012\u0012\u0010\u0015\u001a\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\u0016\u0012\n\b\u0002\u0010\u0017\u001a\u0004\u0018\u00010\u0018¢\u0006\u0002\u0010\u0019J8\u00100\u001a\u0018\u0012\u0014\u0012\u0012\u0012\u0006\u0012\u0004\u0018\u00018��\u0012\u0006\u0012\u0004\u0018\u00018\u000102012\u0018\u00103\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020!\u0012\u0004\u0012\u00020!0201H\u0002J \u00104\u001a\u00020\u00112\u0016\b\u0002\u00105\u001a\u0010\u0012\u0004\u0012\u00020\u001c\u0012\u0004\u0012\u000207\u0018\u000106H\u0002J \u00108\u001a\u00020\u00112\u0016\b\u0002\u00105\u001a\u0010\u0012\u0004\u0012\u00020\u001c\u0012\u0004\u0012\u000207\u0018\u000106H\u0002J\u0011\u00109\u001a\u00020\u0011H\u0082@ø\u0001��¢\u0006\u0002\u0010:J\b\u0010;\u001a\u00020&H\u0002J$\u0010<\u001a\u00020\u00112\u0006\u0010=\u001a\u00020>2\u0006\u0010?\u001a\u00020\u00142\n\b\u0002\u0010@\u001a\u0004\u0018\u00010AH\u0002J\b\u0010B\u001a\u00020\u0011H\u0002J\b\u0010C\u001a\u00020\u0011H\u0002J\u0014\u0010D\u001a\u000e\u0012\u0004\u0012\u00020\u001c\u0012\u0004\u0012\u00020706H\u0002J\u0006\u0010E\u001a\u00020\u0011J\b\u0010F\u001a\u00020\u0011H\u0002J \u0010G\u001a\u00020\u00112\u0016\u00103\u001a\u0012\u0012\u0006\u0012\u0004\u0018\u00018��\u0012\u0006\u0012\u0004\u0018\u00018\u00010\u0010H\u0002J\b\u0010H\u001a\u00020IH\u0002J\u0006\u0010J\u001a\u00020\u0011J\u0011\u0010K\u001a\u00020\u0011H\u0086@ø\u0001��¢\u0006\u0002\u0010:J\u0006\u0010L\u001a\u00020\u0011J\u0006\u0010.\u001a\u00020/J\b\u0010M\u001a\u00020\u0011H\u0002J\b\u0010N\u001a\u00020\u0014H\u0016J\u0018\u0010O\u001a\u00020\u00112\u000e\u00103\u001a\n\u0012\u0002\b\u0003\u0012\u0002\b\u00030\u0010H\u0002R\u0014\u0010\u001a\u001a\b\u0012\u0004\u0012\u00020\u001c0\u001bX\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u0013\u001a\u00020\u0014X\u0082\u000e¢\u0006\u0002\n��R\u001a\u0010\u001d\u001a\u000e\u0012\u0004\u0012\u00020\u001c\u0012\u0004\u0012\u00020\u001f0\u001eX\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010 \u001a\u000e\u0012\u0004\u0012\u00020!\u0012\u0004\u0012\u00020!0\u000fX\u0082\u0004¢\u0006\u0002\n��R\u0010\u0010\u0017\u001a\u0004\u0018\u00010\u0018X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\"\u001a\u00020#X\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\u0015\u001a\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\u0016X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010$\u001a\u00020\u0014X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010%\u001a\u00020&X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010'\u001a\u00020(X\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\n\u001a\b\u0012\u0004\u0012\u00028��0\u000bX\u0082\u0004¢\u0006\u0002\n��RH\u0010\r\u001a<\u0012\f\u0012\n\u0012\u0002\b\u0003\u0012\u0002\b\u00030\u000f\u0012\u0014\u0012\u0012\u0012\u0006\u0012\u0004\u0018\u00018��\u0012\u0006\u0012\u0004\u0018\u00018\u00010\u0010\u0012\u0004\u0012\u00020\u00110\u000ej\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u0001`\u0012X\u0082\u0004¢\u0006\u0002\n��R\u0016\u0010)\u001a\n +*\u0004\u0018\u00010*0*X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010,\u001a\u00020-X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010.\u001a\u00020/X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\f\u001a\b\u0012\u0004\u0012\u00028\u00010\u000bX\u0082\u0004¢\u0006\u0002\n��\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006R"}, d2 = {"Lio/streamthoughts/kafka/clients/consumer/ConsumerTask;", "K", "V", "", "consumerFactory", "Lio/streamthoughts/kafka/clients/consumer/ConsumerFactory;", "consumerConfigs", "Lio/streamthoughts/kafka/clients/consumer/KafkaConsumerConfigs;", "subscription", "Lio/streamthoughts/kafka/clients/consumer/TopicSubscription;", "keyDeserializer", "Lorg/apache/kafka/common/serialization/Deserializer;", "valueDeserializer", "listener", "Lkotlin/Function2;", "Lorg/apache/kafka/clients/consumer/Consumer;", "Lorg/apache/kafka/clients/consumer/ConsumerRecords;", "", "Lio/streamthoughts/kafka/clients/consumer/ConsumerBatchRecordListener;", "clientId", "", "deserializationErrorHandler", "Lio/streamthoughts/kafka/clients/consumer/error/serialization/DeserializationErrorHandler;", "consumerAwareRebalanceListener", "Lio/streamthoughts/kafka/clients/consumer/ConsumerAwareRebalanceListener;", "(Lio/streamthoughts/kafka/clients/consumer/ConsumerFactory;Lio/streamthoughts/kafka/clients/consumer/KafkaConsumerConfigs;Lio/streamthoughts/kafka/clients/consumer/TopicSubscription;Lorg/apache/kafka/common/serialization/Deserializer;Lorg/apache/kafka/common/serialization/Deserializer;Lkotlin/jvm/functions/Function2;Ljava/lang/String;Lio/streamthoughts/kafka/clients/consumer/error/serialization/DeserializationErrorHandler;Lio/streamthoughts/kafka/clients/consumer/ConsumerAwareRebalanceListener;)V", "assignedPartitions", "", "Lorg/apache/kafka/common/TopicPartition;", "consumedOffsets", "", "", "consumer", "", "consumerConfig", "Lorg/apache/kafka/clients/consumer/ConsumerConfig;", "groupId", "isAutoCommitEnabled", "", "isShutdown", "Ljava/util/concurrent/atomic/AtomicBoolean;", "pollTime", "Ljava/time/Duration;", "kotlin.jvm.PlatformType", "shutdownLatch", "Ljava/util/concurrent/CountDownLatch;", "state", "Lio/streamthoughts/kafka/clients/consumer/ConsumerTask$ConsumerState;", "deserialize", "", "Lorg/apache/kafka/clients/consumer/ConsumerRecord;", "records", "doCommitAsync", "offsets", "", "Lorg/apache/kafka/clients/consumer/OffsetAndMetadata;", "doCommitSync", "isCancelled", "(Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "isStillRunning", "logWithConsumerInfo", "level", "Lch/qos/logback/classic/Level;", "msg", "exception", "Ljava/lang/Exception;", "mayCommitAfterBatch", "mayCommitOnAssignment", "offsetAndMetadataToCommit", "pause", "pollOnce", "processBatchRecords", "rebalanceListener", "Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;", "resume", "run", "shutdown", "subscribeConsumer", "toString", "updateConsumedOffsets", "Companion", "ConsumerState", "kafka-clients-kotlin"})
/* loaded from: input_file:io/streamthoughts/kafka/clients/consumer/ConsumerTask.class */
public final class ConsumerTask<K, V> {
    private volatile ConsumerState state;
    private final Consumer<byte[], byte[]> consumer;
    private final ConsumerConfig consumerConfig;
    private final String groupId;
    private final Duration pollTime;
    private final boolean isAutoCommitEnabled;
    private final AtomicBoolean isShutdown;
    private final CountDownLatch shutdownLatch;
    private List<TopicPartition> assignedPartitions;
    private final Map<TopicPartition, Long> consumedOffsets;
    private final TopicSubscription subscription;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private final Function2<Consumer<?, ?>, ConsumerRecords<K, V>, Unit> listener;
    private String clientId;
    private final DeserializationErrorHandler<K, V> deserializationErrorHandler;
    private final ConsumerAwareRebalanceListener consumerAwareRebalanceListener;
    public static final Companion Companion = new Companion(null);
    private static final Logger Log = LoggerUtilsKt.loggerFor(ConsumerTask.class);

    /* compiled from: ConsumerTask.kt */
    @Metadata(mv = {1, 1, 16}, bv = {1, 0, 3}, k = 1, d1 = {"��\u0012\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0005"}, d2 = {"Lio/streamthoughts/kafka/clients/consumer/ConsumerTask$Companion;", "", "()V", "Log", "Lorg/slf4j/Logger;", "kafka-clients-kotlin"})
    /* loaded from: input_file:io/streamthoughts/kafka/clients/consumer/ConsumerTask$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    /* compiled from: ConsumerTask.kt */
    @Metadata(mv = {1, 1, 16}, bv = {1, 0, 3}, k = 1, d1 = {"��\f\n\u0002\u0018\u0002\n\u0002\u0010\u0010\n\u0002\b\n\b\u0086\u0001\u0018��2\b\u0012\u0004\u0012\u00020��0\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002j\u0002\b\u0003j\u0002\b\u0004j\u0002\b\u0005j\u0002\b\u0006j\u0002\b\u0007j\u0002\b\bj\u0002\b\tj\u0002\b\n¨\u0006\u000b"}, d2 = {"Lio/streamthoughts/kafka/clients/consumer/ConsumerTask$ConsumerState;", "", "(Ljava/lang/String;I)V", "CREATED", "STARTING", "RUNNING", "PAUSED", "PARTITIONS_ASSIGNED", "PARTITIONS_REVOKED", "PENDING_SHUTDOWN", "SHUTDOWN", "kafka-clients-kotlin"})
    /* loaded from: input_file:io/streamthoughts/kafka/clients/consumer/ConsumerTask$ConsumerState.class */
    public enum ConsumerState {
        CREATED,
        STARTING,
        RUNNING,
        PAUSED,
        PARTITIONS_ASSIGNED,
        PARTITIONS_REVOKED,
        PENDING_SHUTDOWN,
        SHUTDOWN
    }

    public final void pause() {
        Set assignment = this.consumer.assignment();
        Level level = Level.INFO;
        Intrinsics.checkExpressionValueIsNotNull(level, "Level.INFO");
        logWithConsumerInfo$default(this, level, "Pausing consumption for: " + assignment, null, 4, null);
        this.state = ConsumerState.PAUSED;
        this.consumer.pause(assignment);
    }

    public final void resume() {
        Set assignment = this.consumer.assignment();
        Level level = Level.INFO;
        Intrinsics.checkExpressionValueIsNotNull(level, "Level.INFO");
        logWithConsumerInfo$default(this, level, "Resuming consumption for: " + assignment, null, 4, null);
        this.state = ConsumerState.RUNNING;
        this.consumer.resume(assignment);
    }

    @NotNull
    public final ConsumerState state() {
        return this.state;
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:8:0x0042. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:14:0x0081 A[Catch: WakeupException -> 0x00e6, CancellationException -> 0x013b, all -> 0x0153, TRY_LEAVE, TryCatch #1 {CancellationException -> 0x013b, blocks: (B:12:0x007a, B:14:0x0081, B:17:0x00ab, B:40:0x00a5), top: B:39:0x00a5, outer: #0 }] */
    /* JADX WARN: Removed duplicated region for block: B:38:0x009c  */
    /* JADX WARN: Removed duplicated region for block: B:42:0x018a  */
    /* JADX WARN: Removed duplicated region for block: B:9:0x0058  */
    @org.jetbrains.annotations.Nullable
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public final java.lang.Object run(@org.jetbrains.annotations.NotNull kotlin.coroutines.Continuation<? super kotlin.Unit> r8) {
        /*
            Method dump skipped, instructions count: 404
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.streamthoughts.kafka.clients.consumer.ConsumerTask.run(kotlin.coroutines.Continuation):java.lang.Object");
    }

    private final boolean isStillRunning() {
        return !this.isShutdown.get();
    }

    @Nullable
    final /* synthetic */ Object isCancelled(@NotNull Continuation<? super Unit> continuation) throws CancellationException {
        Object yield = YieldKt.yield(continuation);
        return yield == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? yield : Unit.INSTANCE;
    }

    private final void subscribeConsumer() {
        this.subscription.subscribe(this.consumer, rebalanceListener());
    }

    private final void pollOnce() {
        ConsumerRecords<?, ?> poll = this.consumer.poll(this.pollTime);
        Intrinsics.checkExpressionValueIsNotNull(poll, "consumer.poll(pollTime)");
        Set partitions = poll.partitions();
        Intrinsics.checkExpressionValueIsNotNull(partitions, "records.partitions()");
        Set<TopicPartition> set = partitions;
        ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(set, 10));
        for (TopicPartition topicPartition : set) {
            List<? extends ConsumerRecord<byte[], byte[]>> records = poll.records(topicPartition);
            Intrinsics.checkExpressionValueIsNotNull(records, "records.records(it)");
            arrayList.add(new Pair(topicPartition, deserialize(records)));
        }
        processBatchRecords(new ConsumerRecords<>(MapsKt.toMap(arrayList)));
        updateConsumedOffsets(poll);
        mayCommitAfterBatch();
    }

    private final void processBatchRecords(ConsumerRecords<K, V> consumerRecords) {
        this.listener.invoke(this.consumer, consumerRecords);
    }

    private final void updateConsumedOffsets(ConsumerRecords<?, ?> consumerRecords) {
        Set<TopicPartition> partitions = consumerRecords.partitions();
        Intrinsics.checkExpressionValueIsNotNull(partitions, "records.partitions()");
        for (TopicPartition topicPartition : partitions) {
            ConsumerTask<K, V> consumerTask = this;
            Long l = consumerTask.consumedOffsets.get(topicPartition);
            long longValue = l != null ? l.longValue() : 0L;
            List records = consumerRecords.records(topicPartition);
            Intrinsics.checkExpressionValueIsNotNull(records, "records.records(topicPartition)");
            Iterator it = records.iterator();
            while (it.hasNext()) {
                longValue = Math.max(longValue, ((ConsumerRecord) it.next()).offset());
            }
            Map<TopicPartition, Long> map = consumerTask.consumedOffsets;
            Intrinsics.checkExpressionValueIsNotNull(topicPartition, "topicPartition");
            map.put(topicPartition, Long.valueOf(longValue));
        }
    }

    public final void shutdown() {
        Level level = Level.INFO;
        Intrinsics.checkExpressionValueIsNotNull(level, "Level.INFO");
        logWithConsumerInfo$default(this, level, "Closing", null, 4, null);
        this.isShutdown.set(true);
        this.consumer.wakeup();
        this.shutdownLatch.await();
    }

    private final List<ConsumerRecord<K, V>> deserialize(List<? extends ConsumerRecord<byte[], byte[]>> list) {
        Pair pair;
        Pair pair2;
        LinkedList linkedList = new LinkedList();
        for (ConsumerRecord<byte[], byte[]> consumerRecord : list) {
            String str = consumerRecord.topic();
            try {
                pair2 = new Pair(this.keyDeserializer.deserialize(str, consumerRecord.headers(), (byte[]) consumerRecord.key()), this.valueDeserializer.deserialize(str, consumerRecord.headers(), (byte[]) consumerRecord.value()));
            } catch (Exception e) {
                DeserializationErrorHandler.Response<K, V> handle = this.deserializationErrorHandler.handle(consumerRecord, e);
                if (handle instanceof DeserializationErrorHandler.Response.Replace) {
                    pair = new Pair(((DeserializationErrorHandler.Response.Replace) handle).getKey(), ((DeserializationErrorHandler.Response.Replace) handle).getValue());
                } else {
                    if (handle instanceof DeserializationErrorHandler.Response.Fail) {
                        throw e;
                    }
                    if (!(handle instanceof DeserializationErrorHandler.Response.Skip)) {
                        throw e;
                    }
                    pair = null;
                }
                pair2 = pair;
            }
            Pair pair3 = pair2;
            if (pair3 != null) {
                linkedList.add(new ConsumerRecord(str, consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp(), consumerRecord.timestampType(), Long.valueOf(consumerRecord.checksum()), consumerRecord.serializedKeySize(), consumerRecord.serializedValueSize(), pair3.getFirst(), pair3.getSecond(), consumerRecord.headers()));
            }
        }
        return linkedList;
    }

    private final ConsumerRebalanceListener rebalanceListener() {
        return new ConsumerRebalanceListener() { // from class: io.streamthoughts.kafka.clients.consumer.ConsumerTask$rebalanceListener$1
            public void onPartitionsAssigned(@NotNull Collection<TopicPartition> collection) {
                List list;
                ConsumerAwareRebalanceListener consumerAwareRebalanceListener;
                Consumer<?, ?> consumer;
                Intrinsics.checkParameterIsNotNull(collection, "partitions");
                ConsumerTask consumerTask = ConsumerTask.this;
                Level level = Level.INFO;
                Intrinsics.checkExpressionValueIsNotNull(level, "Level.INFO");
                ConsumerTask.logWithConsumerInfo$default(consumerTask, level, "Partitions Assigned: " + collection, null, 4, null);
                ConsumerTask.this.state = ConsumerTask.ConsumerState.PARTITIONS_ASSIGNED;
                list = ConsumerTask.this.assignedPartitions;
                list.addAll(collection);
                consumerAwareRebalanceListener = ConsumerTask.this.consumerAwareRebalanceListener;
                if (consumerAwareRebalanceListener != null) {
                    consumer = ConsumerTask.this.consumer;
                    consumerAwareRebalanceListener.onPartitionsAssigned(consumer, collection);
                }
                if (collection.isEmpty()) {
                    return;
                }
                ConsumerTask.this.mayCommitOnAssignment();
            }

            public void onPartitionsRevoked(@NotNull Collection<TopicPartition> collection) {
                ConsumerAwareRebalanceListener consumerAwareRebalanceListener;
                Map offsetAndMetadataToCommit;
                ConsumerAwareRebalanceListener consumerAwareRebalanceListener2;
                List list;
                Consumer<?, ?> consumer;
                Consumer<?, ?> consumer2;
                Intrinsics.checkParameterIsNotNull(collection, "partitions");
                ConsumerTask consumerTask = ConsumerTask.this;
                Level level = Level.INFO;
                Intrinsics.checkExpressionValueIsNotNull(level, "Level.INFO");
                ConsumerTask.logWithConsumerInfo$default(consumerTask, level, "Partitions Revoked: " + collection, null, 4, null);
                ConsumerTask.this.state = ConsumerTask.ConsumerState.PARTITIONS_REVOKED;
                consumerAwareRebalanceListener = ConsumerTask.this.consumerAwareRebalanceListener;
                if (consumerAwareRebalanceListener != null) {
                    consumer2 = ConsumerTask.this.consumer;
                    consumerAwareRebalanceListener.onPartitionsRevokedBeforeCommit(consumer2, collection);
                }
                ConsumerTask consumerTask2 = ConsumerTask.this;
                offsetAndMetadataToCommit = ConsumerTask.this.offsetAndMetadataToCommit();
                consumerTask2.doCommitSync(offsetAndMetadataToCommit);
                consumerAwareRebalanceListener2 = ConsumerTask.this.consumerAwareRebalanceListener;
                if (consumerAwareRebalanceListener2 != null) {
                    consumer = ConsumerTask.this.consumer;
                    consumerAwareRebalanceListener2.onPartitionsRevokedAfterCommit(consumer, collection);
                }
                list = ConsumerTask.this.assignedPartitions;
                list.clear();
            }

            public void onPartitionsLost(@NotNull Collection<TopicPartition> collection) {
                ConsumerAwareRebalanceListener consumerAwareRebalanceListener;
                List list;
                Consumer<?, ?> consumer;
                Intrinsics.checkParameterIsNotNull(collection, "partitions");
                ConsumerTask consumerTask = ConsumerTask.this;
                Level level = Level.INFO;
                Intrinsics.checkExpressionValueIsNotNull(level, "Level.INFO");
                ConsumerTask.logWithConsumerInfo$default(consumerTask, level, "Partitions Lost: " + collection, null, 4, null);
                consumerAwareRebalanceListener = ConsumerTask.this.consumerAwareRebalanceListener;
                if (consumerAwareRebalanceListener != null) {
                    consumer = ConsumerTask.this.consumer;
                    consumerAwareRebalanceListener.onPartitionsLost(consumer, collection);
                }
                list = ConsumerTask.this.assignedPartitions;
                list.clear();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataToCommit() {
        Map<TopicPartition, Long> map = this.consumedOffsets;
        ArrayList arrayList = new ArrayList(map.size());
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            arrayList.add(new Pair(entry.getKey(), new OffsetAndMetadata(entry.getValue().longValue())));
        }
        return MapsKt.toMap(arrayList);
    }

    private final void mayCommitAfterBatch() {
        if (this.isAutoCommitEnabled) {
            return;
        }
        if (!this.consumedOffsets.isEmpty()) {
            doCommitAsync(offsetAndMetadataToCommit());
            this.consumedOffsets.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void mayCommitOnAssignment() {
        if (Intrinsics.areEqual(this.consumerConfig.getString("auto.offset.reset"), AutoOffsetReset.Latest)) {
            List<TopicPartition> list = this.assignedPartitions;
            ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(list, 10));
            for (TopicPartition topicPartition : list) {
                arrayList.add(new Pair(topicPartition, new OffsetAndMetadata(this.consumer.position(topicPartition))));
            }
            doCommitSync(MapsKt.toMap(arrayList));
        }
    }

    private final void doCommitAsync(Map<TopicPartition, ? extends OffsetAndMetadata> map) {
        Level level = Level.INFO;
        Intrinsics.checkExpressionValueIsNotNull(level, "Level.INFO");
        logWithConsumerInfo$default(this, level, "Committing offsets  async-synchronously for positions: " + map, null, 4, null);
        this.consumer.commitAsync(map, new OffsetCommitCallback() { // from class: io.streamthoughts.kafka.clients.consumer.ConsumerTask$doCommitAsync$1
            public final void onComplete(Map<TopicPartition, OffsetAndMetadata> map2, Exception exc) {
                if (exc != null) {
                    ConsumerTask consumerTask = ConsumerTask.this;
                    Level level2 = Level.WARN;
                    Intrinsics.checkExpressionValueIsNotNull(level2, "Level.WARN");
                    consumerTask.logWithConsumerInfo(level2, "Fail to commit position async-synchronously", exc);
                }
            }
        });
    }

    static /* synthetic */ void doCommitAsync$default(ConsumerTask consumerTask, Map map, int i, Object obj) {
        if ((i & 1) != 0) {
            map = (Map) null;
        }
        consumerTask.doCommitAsync(map);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void doCommitSync(Map<TopicPartition, ? extends OffsetAndMetadata> map) {
        if (this.consumer.assignment().isEmpty()) {
            return;
        }
        try {
            if (map == null) {
                Level level = Level.WARN;
                Intrinsics.checkExpressionValueIsNotNull(level, "Level.WARN");
                logWithConsumerInfo$default(this, level, "Committing offsets synchronously for consumed records", null, 4, null);
                this.consumer.commitSync();
            } else {
                Level level2 = Level.WARN;
                Intrinsics.checkExpressionValueIsNotNull(level2, "Level.WARN");
                logWithConsumerInfo$default(this, level2, "Committing offsets synchronously for positions: " + map, null, 4, null);
                this.consumer.commitSync(map);
            }
            Level level3 = Level.WARN;
            Intrinsics.checkExpressionValueIsNotNull(level3, "Level.WARN");
            logWithConsumerInfo$default(this, level3, "Offsets committed for partitions: " + this.assignedPartitions, null, 4, null);
        } catch (RebalanceInProgressException e) {
            Level level4 = Level.WARN;
            Intrinsics.checkExpressionValueIsNotNull(level4, "Level.WARN");
            logWithConsumerInfo$default(this, level4, "Error while committing offsets due to a rebalance in progress. Ignored", null, 4, null);
        } catch (RetriableCommitFailedException e2) {
            doCommitSync(map);
        }
    }

    static /* synthetic */ void doCommitSync$default(ConsumerTask consumerTask, Map map, int i, Object obj) {
        if ((i & 1) != 0) {
            map = (Map) null;
        }
        consumerTask.doCommitSync(map);
    }

    @NotNull
    public String toString() {
        return "ConsumerTask(groupId='" + this.groupId + "', subscription=" + this.subscription + ", assignedPartitions=" + this.assignedPartitions + ", state=" + this.state + ')';
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void logWithConsumerInfo(Level level, String str, Exception exc) {
        String str2 = "Consumer(groupId=" + this.groupId + ", clientId=" + this.clientId + "): " + str;
        if (Intrinsics.areEqual(level, Level.ERROR)) {
            Log.error(str2, exc);
            return;
        }
        if (Intrinsics.areEqual(level, Level.WARN)) {
            Log.warn(str2);
            return;
        }
        if (Intrinsics.areEqual(level, Level.INFO)) {
            Log.info(str2);
        } else if (Intrinsics.areEqual(level, Level.DEBUG)) {
            Log.debug(str2);
        } else {
            Log.debug(str2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static /* synthetic */ void logWithConsumerInfo$default(ConsumerTask consumerTask, Level level, String str, Exception exc, int i, Object obj) {
        if ((i & 4) != 0) {
            exc = (Exception) null;
        }
        consumerTask.logWithConsumerInfo(level, str, exc);
    }

    public ConsumerTask(@NotNull ConsumerFactory consumerFactory, @NotNull KafkaConsumerConfigs kafkaConsumerConfigs, @NotNull TopicSubscription topicSubscription, @NotNull Deserializer<K> deserializer, @NotNull Deserializer<V> deserializer2, @NotNull Function2<? super Consumer<?, ?>, ? super ConsumerRecords<K, V>, Unit> function2, @NotNull String str, @NotNull DeserializationErrorHandler<K, V> deserializationErrorHandler, @Nullable ConsumerAwareRebalanceListener consumerAwareRebalanceListener) {
        Intrinsics.checkParameterIsNotNull(consumerFactory, "consumerFactory");
        Intrinsics.checkParameterIsNotNull(kafkaConsumerConfigs, "consumerConfigs");
        Intrinsics.checkParameterIsNotNull(topicSubscription, "subscription");
        Intrinsics.checkParameterIsNotNull(deserializer, "keyDeserializer");
        Intrinsics.checkParameterIsNotNull(deserializer2, "valueDeserializer");
        Intrinsics.checkParameterIsNotNull(function2, "listener");
        Intrinsics.checkParameterIsNotNull(str, "clientId");
        Intrinsics.checkParameterIsNotNull(deserializationErrorHandler, "deserializationErrorHandler");
        this.subscription = topicSubscription;
        this.keyDeserializer = deserializer;
        this.valueDeserializer = deserializer2;
        this.listener = function2;
        this.clientId = str;
        this.deserializationErrorHandler = deserializationErrorHandler;
        this.consumerAwareRebalanceListener = consumerAwareRebalanceListener;
        this.state = ConsumerState.CREATED;
        this.pollTime = Duration.ofMillis(kafkaConsumerConfigs.getPollRecordsMs());
        this.isShutdown = new AtomicBoolean(false);
        this.shutdownLatch = new CountDownLatch(1);
        this.assignedPartitions = new ArrayList();
        this.consumedOffsets = new HashMap();
        final HashMap hashMap = new HashMap(kafkaConsumerConfigs.asMap());
        if (this.clientId.length() == 0) {
            this.clientId = String.valueOf(hashMap.get("client.id"));
        }
        hashMap.put("client.id", this.clientId);
        hashMap.put("key.deserializer", ByteArrayDeserializer.class);
        hashMap.put("key.deserializer", ByteArrayDeserializer.class);
        hashMap.put("value.deserializer", ByteArrayDeserializer.class);
        if (!hashMap.containsKey("enable.auto.commit")) {
            hashMap.put("enable.auto.commit", false);
        }
        this.consumer = consumerFactory.make(hashMap);
        this.consumerConfig = new ConsumerConfig(hashMap, false) { // from class: io.streamthoughts.kafka.clients.consumer.ConsumerTask.1
        };
        String string = this.consumerConfig.getString("group.id");
        Intrinsics.checkExpressionValueIsNotNull(string, "consumerConfig.getString…erConfig.GROUP_ID_CONFIG)");
        this.groupId = string;
        String string2 = this.consumerConfig.getString("client.id");
        Intrinsics.checkExpressionValueIsNotNull(string2, "consumerConfig.getString…rConfig.CLIENT_ID_CONFIG)");
        this.clientId = string2;
        Boolean bool = this.consumerConfig.getBoolean("enable.auto.commit");
        Intrinsics.checkExpressionValueIsNotNull(bool, "consumerConfig.getBoolea…NABLE_AUTO_COMMIT_CONFIG)");
        this.isAutoCommitEnabled = bool.booleanValue();
    }

    public /* synthetic */ ConsumerTask(ConsumerFactory consumerFactory, KafkaConsumerConfigs kafkaConsumerConfigs, TopicSubscription topicSubscription, Deserializer deserializer, Deserializer deserializer2, Function2 function2, String str, DeserializationErrorHandler deserializationErrorHandler, ConsumerAwareRebalanceListener consumerAwareRebalanceListener, int i, DefaultConstructorMarker defaultConstructorMarker) {
        this(consumerFactory, kafkaConsumerConfigs, topicSubscription, deserializer, deserializer2, function2, (i & 64) != 0 ? "" : str, deserializationErrorHandler, (i & 256) != 0 ? (ConsumerAwareRebalanceListener) null : consumerAwareRebalanceListener);
    }
}
