package io.memoria.jutils.messaging.adapter.pulsar;

import io.memoria.jutils.core.utils.functional.VavrUtils;
import io.memoria.jutils.core.utils.yaml.YamlConfigMap;
import io.memoria.jutils.messaging.domain.entity.Msg;
import io.memoria.jutils.messaging.domain.port.MsgProducer;
import io.vavr.Function1;
import io.vavr.control.Try;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.function.Function;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/memoria/jutils/messaging/adapter/pulsar/PulsarMsgProducer.class */
public class PulsarMsgProducer implements MsgProducer {
    private final PulsarClient client;
    private final Duration timeout;

    public PulsarMsgProducer(YamlConfigMap yamlConfigMap) throws PulsarClientException {
        this.client = PulsarClient.builder().serviceUrl(yamlConfigMap.asYamlConfigMap("pulsar").asString("serviceUrl")).build();
        this.timeout = Duration.ofMillis(yamlConfigMap.asYamlConfigMap("reactorPulsar").asLong("request.timeout").longValue());
    }

    public Flux<Try<Void>> produce(String str, String str2, Flux<Msg> flux) {
        return ((Flux) createProducer(this.client, str).map(produceFrom(flux, str2)).getOrElseGet(th -> {
            return Flux.just(Try.failure(th));
        })).timeout(this.timeout);
    }

    private Function<Producer<String>, Flux<Try<Void>>> produceFrom(Flux<Msg> flux, String str) {
        return producer -> {
            return flux.flatMap(send(producer, str)).doFinally(signalType -> {
                close(producer).subscribe();
            });
        };
    }

    public Mono<Try<Void>> close() {
        return Mono.fromFuture(this.client.closeAsync().handle(VavrUtils.handle()));
    }

    private static Try<Producer<String>> createProducer(PulsarClient pulsarClient, String str) {
        return Try.of(() -> {
            return pulsarClient.newProducer(Schema.STRING).topic(str).create();
        });
    }

    private static Function1<Msg, Mono<Try<Void>>> send(Producer<String> producer, String str) {
        return msg -> {
            return Mono.fromFuture(producer.newMessage().key(str).value((String) msg.value).sendAsync().handle(VavrUtils.handleToVoid()));
        };
    }

    private static Mono<Void> close(Producer<String> producer) {
        return Mono.fromFuture(producer.flushAsync()).then(Mono.fromFuture(producer.closeAsync()));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -978152688:
                if (implMethodName.equals("lambda$send$69115666$1")) {
                    z = true;
                    break;
                }
                break;
            case 1610185881:
                if (implMethodName.equals("lambda$createProducer$18ba8eaa$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("io/vavr/CheckedFunction0") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("io/memoria/jutils/messaging/adapter/pulsar/PulsarMsgProducer") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/pulsar/client/api/PulsarClient;Ljava/lang/String;)Lorg/apache/pulsar/client/api/Producer;")) {
                    PulsarClient pulsarClient = (PulsarClient) serializedLambda.getCapturedArg(0);
                    String str = (String) serializedLambda.getCapturedArg(1);
                    return () -> {
                        return pulsarClient.newProducer(Schema.STRING).topic(str).create();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("io/vavr/Function1") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("io/memoria/jutils/messaging/adapter/pulsar/PulsarMsgProducer") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/pulsar/client/api/Producer;Ljava/lang/String;Lio/memoria/jutils/messaging/domain/entity/Msg;)Lreactor/core/publisher/Mono;")) {
                    Producer producer = (Producer) serializedLambda.getCapturedArg(0);
                    String str2 = (String) serializedLambda.getCapturedArg(1);
                    return msg -> {
                        return Mono.fromFuture(producer.newMessage().key(str2).value((String) msg.value).sendAsync().handle(VavrUtils.handleToVoid()));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
