package io.memoria.jutils.messaging.adapter.kafka;

import io.memoria.jutils.core.messaging.Message;
import io.memoria.jutils.core.messaging.MessageFilter;
import io.memoria.jutils.core.messaging.MsgSender;
import io.memoria.jutils.core.messaging.Response;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/memoria/jutils/messaging/adapter/kafka/KafkaSender.class */
public final class KafkaSender extends Record implements MsgSender {
    private final KafkaProducer<String, String> kafkaProducer;
    private final MessageFilter mf;
    private final Scheduler scheduler;
    private final Duration timeout;

    public KafkaSender(KafkaProducer<String, String> kafkaProducer, MessageFilter messageFilter, Scheduler scheduler, Duration duration) {
        this.kafkaProducer = kafkaProducer;
        this.mf = messageFilter;
        this.scheduler = scheduler;
        this.timeout = duration;
    }

    public Flux<Response> apply(Flux<Message> flux) {
        return flux.map(message -> {
            return new ProducerRecord(this.mf.topic(), Integer.valueOf(this.mf.partition()), message.id().getOrElse(0L), message.value());
        }).concatMap(this::sendRecord).map(recordMetadata -> {
            return recordMetadata.hasOffset() ? new Response(recordMetadata.offset()) : Response.empty();
        });
    }

    private Mono<RecordMetadata> sendRecord(ProducerRecord<String, String> producerRecord) {
        return Mono.create(monoSink -> {
            this.kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
                if (recordMetadata != null) {
                    monoSink.success(recordMetadata);
                } else {
                    monoSink.error(exc);
                }
            });
        }).subscribeOn(this.scheduler);
    }

    @Override // java.lang.Record
    public String toString() {
        return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, KafkaSender.class), KafkaSender.class, "kafkaProducer;mf;scheduler;timeout", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaSender;->kafkaProducer:Lorg/apache/kafka/clients/producer/KafkaProducer;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaSender;->mf:Lio/memoria/jutils/core/messaging/MessageFilter;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaSender;->scheduler:Lreactor/core/scheduler/Scheduler;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaSender;->timeout:Ljava/time/Duration;").dynamicInvoker().invoke(this) /* invoke-custom */;
    }

    @Override // java.lang.Record
    public final int hashCode() {
        return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, KafkaSender.class), KafkaSender.class, "kafkaProducer;mf;scheduler;timeout", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaSender;->kafkaProducer:Lorg/apache/kafka/clients/producer/KafkaProducer;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaSender;->mf:Lio/memoria/jutils/core/messaging/MessageFilter;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaSender;->scheduler:Lreactor/core/scheduler/Scheduler;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaSender;->timeout:Ljava/time/Duration;").dynamicInvoker().invoke(this) /* invoke-custom */;
    }

    @Override // java.lang.Record
    public final boolean equals(Object obj) {
        return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, KafkaSender.class, Object.class), KafkaSender.class, "kafkaProducer;mf;scheduler;timeout", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaSender;->kafkaProducer:Lorg/apache/kafka/clients/producer/KafkaProducer;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaSender;->mf:Lio/memoria/jutils/core/messaging/MessageFilter;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaSender;->scheduler:Lreactor/core/scheduler/Scheduler;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaSender;->timeout:Ljava/time/Duration;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
    }

    public KafkaProducer<String, String> kafkaProducer() {
        return this.kafkaProducer;
    }

    public MessageFilter mf() {
        return this.mf;
    }

    public Scheduler scheduler() {
        return this.scheduler;
    }

    public Duration timeout() {
        return this.timeout;
    }
}
