package io.memoria.jutils.messaging.adapter.nats;

import io.memoria.jutils.core.utils.functional.ReactorVavrUtils;
import io.memoria.jutils.core.utils.yaml.YamlConfigMap;
import io.memoria.jutils.messaging.domain.entity.Msg;
import io.memoria.jutils.messaging.domain.port.MsgConsumer;
import io.nats.client.Connection;
import io.nats.client.Subscription;
import io.vavr.control.Try;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Objects;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/memoria/jutils/messaging/adapter/nats/NatsMsgConsumer.class */
public class NatsMsgConsumer implements MsgConsumer {
    private final Scheduler scheduler;
    private final Duration timeout;
    private final Connection nc;

    public NatsMsgConsumer(YamlConfigMap yamlConfigMap, Scheduler scheduler) throws IOException, InterruptedException {
        this.scheduler = scheduler;
        this.timeout = Duration.ofMillis(yamlConfigMap.asYamlConfigMap("reactorNats").asLong("consumer.request.timeout").longValue());
        this.nc = NatsConnection.create(yamlConfigMap);
    }

    public Flux<Try<Msg>> consume(String str, String str2, long j) {
        Subscription subscribe = this.nc.subscribe(str + "." + str2);
        Flux generate = Flux.generate(synchronousSink -> {
            synchronousSink.next(pollOnce(subscribe));
        });
        return Flux.defer(() -> {
            return generate.subscribeOn(this.scheduler).skip(j);
        });
    }

    public Mono<Try<Void>> close() {
        return ReactorVavrUtils.blockingToMono(() -> {
            Connection connection = this.nc;
            Objects.requireNonNull(connection);
            return Try.run(connection::close);
        }, this.scheduler);
    }

    private Try<Msg> pollOnce(Subscription subscription) {
        return Try.of(() -> {
            return subscription.nextMessage(this.timeout);
        }).map(message -> {
            return new Msg(message.getSID(), new String(message.getData()));
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1500262126:
                if (implMethodName.equals("lambda$pollOnce$34df2ad0$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("io/vavr/CheckedFunction0") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("io/memoria/jutils/messaging/adapter/nats/NatsMsgConsumer") && serializedLambda.getImplMethodSignature().equals("(Lio/nats/client/Subscription;)Lio/nats/client/Message;")) {
                    NatsMsgConsumer natsMsgConsumer = (NatsMsgConsumer) serializedLambda.getCapturedArg(0);
                    Subscription subscription = (Subscription) serializedLambda.getCapturedArg(1);
                    return () -> {
                        return subscription.nextMessage(this.timeout);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
