package io.memoria.jutils.messaging.adapter.kafka;

import io.memoria.jutils.core.utils.functional.ReactorVavrUtils;
import io.memoria.jutils.messaging.domain.entity.Msg;
import io.memoria.jutils.messaging.domain.port.MsgProducer;
import io.vavr.control.Try;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.invoke.SerializedLambda;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/memoria/jutils/messaging/adapter/kafka/KafkaMsgProducer.class */
public final class KafkaMsgProducer extends Record implements MsgProducer {
    private final KafkaProducer<String, String> kafkaProducer;
    private final Scheduler scheduler;
    private final Duration timeout;

    public KafkaMsgProducer(KafkaProducer<String, String> kafkaProducer, Scheduler scheduler, Duration duration) {
        this.kafkaProducer = kafkaProducer;
        this.scheduler = scheduler;
        this.timeout = duration;
    }

    public Flux<Try<Void>> produce(String str, String str2, Flux<Msg> flux) {
        return (Flux) Try.of(() -> {
            return Integer.valueOf(Integer.parseInt(str2));
        }).map(num -> {
            return flux.publishOn(this.scheduler).map(msg -> {
                return new ProducerRecord(str, num, msg.id, (String) msg.value);
            }).map(producerRecord -> {
                return Try.run(() -> {
                    send(producerRecord);
                });
            });
        }).getOrElseGet(th -> {
            return Flux.just(Try.failure(th));
        });
    }

    public Mono<Try<Void>> close() {
        return ReactorVavrUtils.blockingToMono(() -> {
            return Try.run(() -> {
                this.kafkaProducer.close(this.timeout);
            });
        }, this.scheduler);
    }

    private RecordMetadata send(ProducerRecord<String, String> producerRecord) throws InterruptedException, ExecutionException, TimeoutException {
        return (RecordMetadata) this.kafkaProducer.send(producerRecord).get(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // java.lang.Record
    public String toString() {
        return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, KafkaMsgProducer.class), KafkaMsgProducer.class, "kafkaProducer;scheduler;timeout", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaMsgProducer;->kafkaProducer:Lorg/apache/kafka/clients/producer/KafkaProducer;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaMsgProducer;->scheduler:Lreactor/core/scheduler/Scheduler;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaMsgProducer;->timeout:Ljava/time/Duration;").dynamicInvoker().invoke(this) /* invoke-custom */;
    }

    @Override // java.lang.Record
    public final int hashCode() {
        return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, KafkaMsgProducer.class), KafkaMsgProducer.class, "kafkaProducer;scheduler;timeout", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaMsgProducer;->kafkaProducer:Lorg/apache/kafka/clients/producer/KafkaProducer;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaMsgProducer;->scheduler:Lreactor/core/scheduler/Scheduler;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaMsgProducer;->timeout:Ljava/time/Duration;").dynamicInvoker().invoke(this) /* invoke-custom */;
    }

    @Override // java.lang.Record
    public final boolean equals(Object obj) {
        return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, KafkaMsgProducer.class, Object.class), KafkaMsgProducer.class, "kafkaProducer;scheduler;timeout", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaMsgProducer;->kafkaProducer:Lorg/apache/kafka/clients/producer/KafkaProducer;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaMsgProducer;->scheduler:Lreactor/core/scheduler/Scheduler;", "FIELD:Lio/memoria/jutils/messaging/adapter/kafka/KafkaMsgProducer;->timeout:Ljava/time/Duration;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
    }

    public KafkaProducer<String, String> kafkaProducer() {
        return this.kafkaProducer;
    }

    public Scheduler scheduler() {
        return this.scheduler;
    }

    public Duration timeout() {
        return this.timeout;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 702500009:
                if (implMethodName.equals("lambda$produce$5a441d62$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("io/vavr/CheckedFunction0") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("io/memoria/jutils/messaging/adapter/kafka/KafkaMsgProducer") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/Integer;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return Integer.valueOf(Integer.parseInt(str));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
