package io.memoria.jutils.messaging.adapter.kafka;

import io.memoria.jutils.core.messaging.Message;
import io.memoria.jutils.core.messaging.MessageFilter;
import io.memoria.jutils.core.messaging.MsgReceiver;
import io.vavr.collection.List;
import io.vavr.control.Option;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/memoria/jutils/messaging/adapter/kafka/KafkaReceiver.class */
public final class KafkaReceiver extends Record implements MsgReceiver {
    private final KafkaConsumer<String, String> consumer;
    private final MessageFilter mf;
    private final Scheduler scheduler;
    private final Duration timeout;

    public KafkaReceiver(KafkaConsumer<String, String> kafkaConsumer, MessageFilter messageFilter, Scheduler scheduler, Duration duration) {
        this.consumer = kafkaConsumer;
        this.mf = messageFilter;
        this.scheduler = scheduler;
        this.timeout = duration;
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public Flux<Message> m0get() {
        TopicPartition topicPartition = new TopicPartition(this.mf.topic(), this.mf.partition());
        this.consumer.assign(List.of(topicPartition).toJavaList());
        this.consumer.poll(this.timeout);
        this.consumer.seek(topicPartition, this.mf.offset());
        return Flux.generate(synchronousSink -> {
            synchronousSink.next(pollOnce(topicPartition));
        }).flatMap(flux -> {
            return flux;
        }).subscribeOn(this.scheduler);
    }

    private Flux<Message> pollOnce(TopicPartition topicPartition) {
        return Mono.fromCallable(() -> {
            return this.consumer.poll(this.timeout);
        }).map(consumerRecords -> {
            return consumerRecords.records(topicPartition);
        }).flatMapMany((v0) -> {
            return Flux.fromIterable(v0);
        }).map(consumerRecord -> {
            return new Message((String) consumerRecord.value(), Option.of((String) consumerRecord.key()).map(Long::parseLong));
        });
    }

    @Override // java.lang.Record
    public String toString() {
        return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, KafkaReceiver.class), KafkaReceiver.class, "consumer;mf;scheduler;timeout", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaReceiver;->consumer:Lorg/apache/kafka/clients/consumer/KafkaConsumer;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaReceiver;->mf:Lio/memoria/jutils/core/messaging/MessageFilter;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaReceiver;->scheduler:Lreactor/core/scheduler/Scheduler;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaReceiver;->timeout:Ljava/time/Duration;").dynamicInvoker().invoke(this) /* invoke-custom */;
    }

    @Override // java.lang.Record
    public final int hashCode() {
        return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, KafkaReceiver.class), KafkaReceiver.class, "consumer;mf;scheduler;timeout", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaReceiver;->consumer:Lorg/apache/kafka/clients/consumer/KafkaConsumer;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaReceiver;->mf:Lio/memoria/jutils/core/messaging/MessageFilter;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaReceiver;->scheduler:Lreactor/core/scheduler/Scheduler;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaReceiver;->timeout:Ljava/time/Duration;").dynamicInvoker().invoke(this) /* invoke-custom */;
    }

    @Override // java.lang.Record
    public final boolean equals(Object obj) {
        return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, KafkaReceiver.class, Object.class), KafkaReceiver.class, "consumer;mf;scheduler;timeout", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaReceiver;->consumer:Lorg/apache/kafka/clients/consumer/KafkaConsumer;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaReceiver;->mf:Lio/memoria/jutils/core/messaging/MessageFilter;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaReceiver;->scheduler:Lreactor/core/scheduler/Scheduler;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaReceiver;->timeout:Ljava/time/Duration;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
    }

    public KafkaConsumer<String, String> consumer() {
        return this.consumer;
    }

    public MessageFilter mf() {
        return this.mf;
    }

    public Scheduler scheduler() {
        return this.scheduler;
    }

    public Duration timeout() {
        return this.timeout;
    }
}
