package io.memoria.jutils.messaging.adapter.kafka;

import io.memoria.jutils.core.utils.functional.ReactorVavrUtils;
import io.memoria.jutils.core.utils.functional.VavrUtils;
import io.memoria.jutils.messaging.domain.entity.Msg;
import io.memoria.jutils.messaging.domain.port.MsgConsumer;
import io.vavr.collection.List;
import io.vavr.control.Try;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.invoke.SerializedLambda;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/memoria/jutils/messaging/adapter/kafka/KafkaMsgConsumer.class */
public final class KafkaMsgConsumer extends Record implements MsgConsumer {
    private final KafkaConsumer<String, String> consumer;
    private final Scheduler scheduler;
    private final Duration timeout;

    public KafkaMsgConsumer(KafkaConsumer<String, String> kafkaConsumer, Scheduler scheduler, Duration duration) {
        this.consumer = kafkaConsumer;
        this.scheduler = scheduler;
        this.timeout = duration;
    }

    public Flux<Try<Msg>> consume(String str, String str2, long j) {
        TopicPartition topicPartition = new TopicPartition(str, Integer.parseInt(str2));
        Mono create = Mono.create(monoSink -> {
            this.consumer.assign(List.of(topicPartition).toJavaList());
            this.consumer.poll(this.timeout);
            this.consumer.seek(topicPartition, j);
            monoSink.success();
        });
        Flux flatMap = Flux.generate(synchronousSink -> {
            synchronousSink.next(pollOnce(topicPartition));
        }).flatMap((v0) -> {
            return Flux.fromIterable(v0);
        });
        return Flux.defer(() -> {
            return create.thenMany(flatMap).subscribeOn(this.scheduler);
        });
    }

    public Mono<Try<Void>> close() {
        return ReactorVavrUtils.blockingToMono(() -> {
            return Try.run(() -> {
                this.consumer.close(this.timeout);
            });
        }, this.scheduler);
    }

    private List<Try<Msg>> pollOnce(TopicPartition topicPartition) {
        return List.ofAll(VavrUtils.traverseOfTry(Try.of(() -> {
            return this.consumer.poll(this.timeout);
        }).map(consumerRecords -> {
            return consumerRecords.records(topicPartition);
        }).map((v0) -> {
            return List.ofAll(v0);
        }).map(list -> {
            return list.map(consumerRecord -> {
                return new Msg((String) consumerRecord.key(), (String) consumerRecord.value());
            });
        })));
    }

    @Override // java.lang.Record
    public String toString() {
        return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, KafkaMsgConsumer.class), KafkaMsgConsumer.class, "consumer;scheduler;timeout", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaMsgConsumer;->consumer:Lorg/apache/kafka/clients/consumer/KafkaConsumer;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaMsgConsumer;->scheduler:Lreactor/core/scheduler/Scheduler;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaMsgConsumer;->timeout:Ljava/time/Duration;").dynamicInvoker().invoke(this) /* invoke-custom */;
    }

    @Override // java.lang.Record
    public final int hashCode() {
        return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, KafkaMsgConsumer.class), KafkaMsgConsumer.class, "consumer;scheduler;timeout", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaMsgConsumer;->consumer:Lorg/apache/kafka/clients/consumer/KafkaConsumer;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaMsgConsumer;->scheduler:Lreactor/core/scheduler/Scheduler;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaMsgConsumer;->timeout:Ljava/time/Duration;").dynamicInvoker().invoke(this) /* invoke-custom */;
    }

    @Override // java.lang.Record
    public final boolean equals(Object obj) {
        return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, KafkaMsgConsumer.class, Object.class), KafkaMsgConsumer.class, "consumer;scheduler;timeout", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaMsgConsumer;->consumer:Lorg/apache/kafka/clients/consumer/KafkaConsumer;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaMsgConsumer;->scheduler:Lreactor/core/scheduler/Scheduler;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaMsgConsumer;->timeout:Ljava/time/Duration;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
    }

    public KafkaConsumer<String, String> consumer() {
        return this.consumer;
    }

    public Scheduler scheduler() {
        return this.scheduler;
    }

    public Duration timeout() {
        return this.timeout;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -956525045:
                if (implMethodName.equals("lambda$pollOnce$e1ba32f3$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("io/vavr/CheckedFunction0") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("io/memoria/jutils/messaging/adapter/kafka/KafkaMsgConsumer") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/kafka/clients/consumer/ConsumerRecords;")) {
                    KafkaMsgConsumer kafkaMsgConsumer = (KafkaMsgConsumer) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return this.consumer.poll(this.timeout);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
