package io.memoria.jutils.messaging.adapter.pulsar;

import io.memoria.jutils.core.utils.functional.VavrUtils;
import io.memoria.jutils.core.utils.yaml.YamlConfigMap;
import io.memoria.jutils.messaging.domain.entity.Msg;
import io.memoria.jutils.messaging.domain.port.MsgConsumer;
import io.vavr.control.Try;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.function.Function;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/memoria/jutils/messaging/adapter/pulsar/PulsarMsgConsumer.class */
public class PulsarMsgConsumer implements MsgConsumer {
    private final PulsarClient client;
    private final Duration timeout;

    public PulsarMsgConsumer(YamlConfigMap yamlConfigMap) throws PulsarClientException {
        this.client = PulsarClient.builder().serviceUrl(yamlConfigMap.asYamlConfigMap("pulsar").asString("serviceUrl")).build();
        this.timeout = Duration.ofMillis(yamlConfigMap.asYamlConfigMap("reactorPulsar").asLong("request.timeout").longValue());
    }

    public Flux<Try<Msg>> consume(String str, String str2, long j) {
        return ((Flux) createConsumer(this.client, str, j).map(PulsarMsgConsumer::consumeFrom).getOrElseGet(th -> {
            return Flux.just(Try.failure(th));
        })).timeout(this.timeout);
    }

    public Mono<Try<Void>> close() {
        return Mono.fromFuture(this.client.closeAsync().handle(VavrUtils.handle()));
    }

    private static Try<Consumer<String>> createConsumer(PulsarClient pulsarClient, String str, long j) {
        return Try.of(() -> {
            Consumer subscribe = pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName(str + "subscription").subscribe();
            subscribe.seek(j);
            return subscribe;
        });
    }

    private static Flux<Try<Msg>> consumeFrom(Consumer<String> consumer) {
        return Flux.generate(synchronousSink -> {
            synchronousSink.next(receive(consumer));
        }).flatMap(Function.identity()).doFinally(signalType -> {
            close(consumer);
        });
    }

    private static Flux<Try<Msg>> receive(Consumer<String> consumer) {
        return Mono.fromFuture(consumer.receiveAsync().handle(VavrUtils.handle())).map(r3 -> {
            return r3.map(message -> {
                return new Msg(message.getKey(), (String) message.getValue());
            });
        }).flux();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Mono<Void> close(Consumer<String> consumer) {
        return Mono.fromFuture(consumer.closeAsync());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1931774578:
                if (implMethodName.equals("lambda$createConsumer$fe95e4df$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("io/vavr/CheckedFunction0") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("io/memoria/jutils/messaging/adapter/pulsar/PulsarMsgConsumer") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/pulsar/client/api/PulsarClient;Ljava/lang/String;J)Lorg/apache/pulsar/client/api/Consumer;")) {
                    PulsarClient pulsarClient = (PulsarClient) serializedLambda.getCapturedArg(0);
                    String str = (String) serializedLambda.getCapturedArg(1);
                    long longValue = ((Long) serializedLambda.getCapturedArg(2)).longValue();
                    return () -> {
                        Consumer subscribe = pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName(str + "subscription").subscribe();
                        subscribe.seek(longValue);
                        return subscribe;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
