package fr.maif.kafka.reactor.consumer;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.util.retry.Retry;

/* loaded from: input_file:fr/maif/kafka/reactor/consumer/ResilientKafkaConsumer.class */
public abstract class ResilientKafkaConsumer<K, V> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ResilientKafkaConsumer.class);
    protected final Collection<String> topics;
    protected final String groupId;
    protected final Long maxRestarts;
    protected final Duration minBackoff;
    protected final Duration maxBackoff;
    protected final Integer commitSize;
    protected final ReceiverOptions<K, V> receiverOptions;
    protected final BiFunction<Disposable, Integer, Mono<Void>> onStarted;
    protected final Supplier<Mono<Void>> onStarting;
    protected final Supplier<Mono<Void>> onStopped;
    protected final Supplier<Mono<Void>> onStopping;
    protected final Function<Throwable, Mono<Void>> onFailed;
    protected final AtomicReference<Disposable> disposableKafkaRef = new AtomicReference<>();
    protected final AtomicReference<CountDownLatch> cdlRef = new AtomicReference<>();
    protected final AtomicReference<Status> innerStatus = new AtomicReference<>(Status.Stopped);
    protected final AtomicReference<KafkaReceiver<K, V>> consumerRef = new AtomicReference<>();

    /* loaded from: input_file:fr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config.class */
    public static final class Config<K, V> extends Record {
        private final Collection<String> topics;
        private final String groupId;
        private final ReceiverOptions<K, V> receiverOptions;
        private final Integer maxRestarts;
        private final Duration minBackoff;
        private final Duration maxBackoff;
        private final Double randomFactor;
        private final Integer commitSize;
        private final BiFunction<Disposable, Integer, Mono<Void>> onStarted;
        private final Supplier<Mono<Void>> onStarting;
        private final Supplier<Mono<Void>> onStopped;
        private final Supplier<Mono<Void>> onStopping;
        private final Function<Throwable, Mono<Void>> onFailed;

        /* loaded from: input_file:fr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config$ConfigBuilder.class */
        public static class ConfigBuilder<K, V> {
            Collection<String> topics;
            String groupId;
            ReceiverOptions<K, V> consumerSettings;
            Integer maxRestarts;
            Duration minBackoff;
            Duration maxBackoff;
            Double randomFactor;
            Integer commitSize;
            BiFunction<Disposable, Integer, Mono<Void>> onStarted;
            Supplier<Mono<Void>> onStarting;
            Supplier<Mono<Void>> onStopped;
            Supplier<Mono<Void>> onStopping;
            Function<Throwable, Mono<Void>> onFailed;

            public ConfigBuilder<K, V> topics(Collection<String> collection) {
                this.topics = collection;
                return this;
            }

            public ConfigBuilder<K, V> groupId(String str) {
                this.groupId = str;
                return this;
            }

            public ConfigBuilder<K, V> receiverOptions(ReceiverOptions<K, V> receiverOptions) {
                this.consumerSettings = receiverOptions;
                return this;
            }

            public ConfigBuilder<K, V> maxRestarts(Integer num) {
                this.maxRestarts = num;
                return this;
            }

            public ConfigBuilder<K, V> minBackoff(Duration duration) {
                this.minBackoff = duration;
                return this;
            }

            public ConfigBuilder<K, V> maxBackoff(Duration duration) {
                this.maxBackoff = duration;
                return this;
            }

            public ConfigBuilder<K, V> randomFactor(Double d) {
                this.randomFactor = d;
                return this;
            }

            public ConfigBuilder<K, V> commitSize(Integer num) {
                this.commitSize = num;
                return this;
            }

            public ConfigBuilder<K, V> onStarted(BiFunction<Disposable, Integer, Mono<Void>> biFunction) {
                this.onStarted = biFunction;
                return this;
            }

            public ConfigBuilder<K, V> onStarting(Supplier<Mono<Void>> supplier) {
                this.onStarting = supplier;
                return this;
            }

            public ConfigBuilder<K, V> onStopped(Supplier<Mono<Void>> supplier) {
                this.onStopped = supplier;
                return this;
            }

            public ConfigBuilder<K, V> onStopping(Supplier<Mono<Void>> supplier) {
                this.onStopping = supplier;
                return this;
            }

            public ConfigBuilder<K, V> onFailed(Function<Throwable, Mono<Void>> function) {
                this.onFailed = function;
                return this;
            }

            public Config<K, V> build() {
                return new Config<>(this.topics, this.groupId, this.consumerSettings, this.maxRestarts, this.minBackoff, this.maxBackoff, this.randomFactor, this.commitSize, this.onStarted, this.onStarting, this.onStopped, this.onStopping, this.onFailed);
            }
        }

        public Config(Collection<String> collection, String str, ReceiverOptions<K, V> receiverOptions, Integer num, Duration duration, Duration duration2, Double d, Integer num2, BiFunction<Disposable, Integer, Mono<Void>> biFunction, Supplier<Mono<Void>> supplier, Supplier<Mono<Void>> supplier2, Supplier<Mono<Void>> supplier3, Function<Throwable, Mono<Void>> function) {
            this.topics = collection;
            this.groupId = str;
            this.receiverOptions = receiverOptions;
            this.maxRestarts = num;
            this.minBackoff = duration;
            this.maxBackoff = duration2;
            this.randomFactor = d;
            this.commitSize = num2;
            this.onStarted = biFunction;
            this.onStarting = supplier;
            this.onStopped = supplier2;
            this.onStopping = supplier3;
            this.onFailed = function;
        }

        public static <K, V> Config<K, V> create(Collection<String> collection, String str, ReceiverOptions<K, V> receiverOptions) {
            return builder().receiverOptions(receiverOptions).groupId(str).topics(collection).build();
        }

        public static <K, V> ConfigBuilder<K, V> builder() {
            return new ConfigBuilder<>();
        }

        public ConfigBuilder<K, V> toBuilder() {
            return builder().topics(this.topics).groupId(this.groupId).receiverOptions(this.receiverOptions).minBackoff(this.minBackoff).maxBackoff(this.maxBackoff).randomFactor(this.randomFactor).commitSize(this.commitSize).onStarted(this.onStarted).onStarting(this.onStarting).onStopped(this.onStopped).onStopping(this.onStopping).onFailed(this.onFailed);
        }

        public Config<K, V> withMaxRestarts(Integer num) {
            return toBuilder().maxRestarts(num).build();
        }

        public Config<K, V> withMinBackoff(Duration duration) {
            return toBuilder().minBackoff(duration).build();
        }

        public Config<K, V> withMaxBackoff(Duration duration) {
            return toBuilder().maxBackoff(duration).build();
        }

        public Config<K, V> withRandomFactor(Double d) {
            return toBuilder().randomFactor(d).build();
        }

        public Config<K, V> withCommitSize(Integer num) {
            return toBuilder().commitSize(num).build();
        }

        public Config<K, V> withOnStarted(BiFunction<Disposable, Integer, Mono<Void>> biFunction) {
            return toBuilder().onStarted(biFunction).build();
        }

        public Config<K, V> withOnStarting(Supplier<Mono<Void>> supplier) {
            return toBuilder().onStarting(supplier).build();
        }

        public Config<K, V> withOnStopped(Supplier<Mono<Void>> supplier) {
            return toBuilder().onStopped(supplier).build();
        }

        public Config<K, V> withOnStopping(Supplier<Mono<Void>> supplier) {
            return toBuilder().onStopping(supplier).build();
        }

        public Config<K, V> withOnFailed(Function<Throwable, Mono<Void>> function) {
            return toBuilder().onFailed(function).build();
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, Config.class), Config.class, "topics;groupId;receiverOptions;maxRestarts;minBackoff;maxBackoff;randomFactor;commitSize;onStarted;onStarting;onStopped;onStopping;onFailed", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->topics:Ljava/util/Collection;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->groupId:Ljava/lang/String;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->receiverOptions:Lreactor/kafka/receiver/ReceiverOptions;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->maxRestarts:Ljava/lang/Integer;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->minBackoff:Ljava/time/Duration;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->maxBackoff:Ljava/time/Duration;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->randomFactor:Ljava/lang/Double;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->commitSize:Ljava/lang/Integer;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->onStarted:Ljava/util/function/BiFunction;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->onStarting:Ljava/util/function/Supplier;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->onStopped:Ljava/util/function/Supplier;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->onStopping:Ljava/util/function/Supplier;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->onFailed:Ljava/util/function/Function;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, Config.class), Config.class, "topics;groupId;receiverOptions;maxRestarts;minBackoff;maxBackoff;randomFactor;commitSize;onStarted;onStarting;onStopped;onStopping;onFailed", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->topics:Ljava/util/Collection;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->groupId:Ljava/lang/String;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->receiverOptions:Lreactor/kafka/receiver/ReceiverOptions;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->maxRestarts:Ljava/lang/Integer;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->minBackoff:Ljava/time/Duration;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->maxBackoff:Ljava/time/Duration;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->randomFactor:Ljava/lang/Double;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->commitSize:Ljava/lang/Integer;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->onStarted:Ljava/util/function/BiFunction;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->onStarting:Ljava/util/function/Supplier;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->onStopped:Ljava/util/function/Supplier;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->onStopping:Ljava/util/function/Supplier;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->onFailed:Ljava/util/function/Function;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, Config.class, Object.class), Config.class, "topics;groupId;receiverOptions;maxRestarts;minBackoff;maxBackoff;randomFactor;commitSize;onStarted;onStarting;onStopped;onStopping;onFailed", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->topics:Ljava/util/Collection;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->groupId:Ljava/lang/String;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->receiverOptions:Lreactor/kafka/receiver/ReceiverOptions;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->maxRestarts:Ljava/lang/Integer;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->minBackoff:Ljava/time/Duration;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->maxBackoff:Ljava/time/Duration;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->randomFactor:Ljava/lang/Double;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->commitSize:Ljava/lang/Integer;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->onStarted:Ljava/util/function/BiFunction;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->onStarting:Ljava/util/function/Supplier;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->onStopped:Ljava/util/function/Supplier;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->onStopping:Ljava/util/function/Supplier;", "FIELD:Lfr/maif/kafka/reactor/consumer/ResilientKafkaConsumer$Config;->onFailed:Ljava/util/function/Function;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Collection<String> topics() {
            return this.topics;
        }

        public String groupId() {
            return this.groupId;
        }

        public ReceiverOptions<K, V> receiverOptions() {
            return this.receiverOptions;
        }

        public Integer maxRestarts() {
            return this.maxRestarts;
        }

        public Duration minBackoff() {
            return this.minBackoff;
        }

        public Duration maxBackoff() {
            return this.maxBackoff;
        }

        public Double randomFactor() {
            return this.randomFactor;
        }

        public Integer commitSize() {
            return this.commitSize;
        }

        public BiFunction<Disposable, Integer, Mono<Void>> onStarted() {
            return this.onStarted;
        }

        public Supplier<Mono<Void>> onStarting() {
            return this.onStarting;
        }

        public Supplier<Mono<Void>> onStopped() {
            return this.onStopped;
        }

        public Supplier<Mono<Void>> onStopping() {
            return this.onStopping;
        }

        public Function<Throwable, Mono<Void>> onFailed() {
            return this.onFailed;
        }
    }

    public ResilientKafkaConsumer(Config<K, V> config) {
        this.topics = ((Config) config).topics;
        this.groupId = ((Config) config).groupId;
        this.maxRestarts = Long.valueOf(((Number) Objects.requireNonNullElse(((Config) config).maxRestarts, Long.MAX_VALUE)).longValue());
        this.minBackoff = Objects.isNull(((Config) config).minBackoff) ? Duration.ofSeconds(30L) : ((Config) config).minBackoff;
        this.maxBackoff = Objects.isNull(((Config) config).maxBackoff) ? Duration.ofMinutes(30L) : ((Config) config).maxBackoff;
        this.commitSize = Integer.valueOf(Objects.isNull(((Config) config).commitSize) ? 10 : ((Config) config).commitSize.intValue());
        this.receiverOptions = ((Config) config).receiverOptions.commitBatchSize(this.commitSize.intValue()).subscription(this.topics).consumerProperty("enable.auto.commit", "false").consumerProperty("group.id", ((Config) config).groupId);
        this.onStarted = (BiFunction) defaultIfNull(((Config) config).onStarted, (disposable, num) -> {
            return Mono.just("").then();
        });
        this.onStarting = (Supplier) defaultIfNull(((Config) config).onStarting, () -> {
            return Mono.just("").then();
        });
        this.onStopped = (Supplier) defaultIfNull(((Config) config).onStopped, () -> {
            return Mono.just("").then();
        });
        this.onStopping = (Supplier) defaultIfNull(((Config) config).onStopping, () -> {
            return Mono.just("").then();
        });
        this.onFailed = (Function) defaultIfNull(((Config) config).onFailed, th -> {
            return Mono.just("").then();
        });
        start();
    }

    public static <K, V> ResilientKafkaConsumer<K, V> createFromFlow(final String str, Config<K, V> config, final Function<Flux<ReceiverRecord<K, V>>, Flux<ReceiverRecord<K, V>>> function) {
        return new ResilientKafkaConsumer<K, V>(config) { // from class: fr.maif.kafka.reactor.consumer.ResilientKafkaConsumer.1
            @Override // fr.maif.kafka.reactor.consumer.ResilientKafkaConsumer
            protected String name() {
                return str;
            }

            @Override // fr.maif.kafka.reactor.consumer.ResilientKafkaConsumer
            public Function<Flux<ReceiverRecord<K, V>>, Flux<ReceiverRecord<K, V>>> messageHandling() {
                return function;
            }
        };
    }

    public static <K, V> ResilientKafkaConsumer<K, V> create(final String str, Config<K, V> config, final Function<ReceiverRecord<K, V>, Mono<Void>> function) {
        return new ResilientKafkaConsumer<K, V>(config) { // from class: fr.maif.kafka.reactor.consumer.ResilientKafkaConsumer.2
            @Override // fr.maif.kafka.reactor.consumer.ResilientKafkaConsumer
            protected String name() {
                return str;
            }

            @Override // fr.maif.kafka.reactor.consumer.ResilientKafkaConsumer
            public Function<Flux<ReceiverRecord<K, V>>, Flux<ReceiverRecord<K, V>>> messageHandling() {
                Function function2 = function;
                return flux -> {
                    return flux.concatMap(receiverRecord -> {
                        return ((Mono) function2.apply(receiverRecord)).map(r3 -> {
                            return receiverRecord;
                        });
                    });
                };
            }
        };
    }

    public static <K, V> ResilientKafkaConsumer<K, V> create(final String str, Config<K, V> config, final Consumer<ReceiverRecord<K, V>> consumer) {
        return new ResilientKafkaConsumer<K, V>(config) { // from class: fr.maif.kafka.reactor.consumer.ResilientKafkaConsumer.3
            @Override // fr.maif.kafka.reactor.consumer.ResilientKafkaConsumer
            protected String name() {
                return str;
            }

            @Override // fr.maif.kafka.reactor.consumer.ResilientKafkaConsumer
            public Function<Flux<ReceiverRecord<K, V>>, Flux<ReceiverRecord<K, V>>> messageHandling() {
                Consumer consumer2 = consumer;
                return flux -> {
                    return flux.concatMap(receiverRecord -> {
                        return Mono.fromCallable(() -> {
                            consumer2.accept(receiverRecord);
                            return "";
                        }).publishOn(Schedulers.parallel()).map(str2 -> {
                            return receiverRecord;
                        });
                    });
                };
            }
        };
    }

    protected static <T> T defaultIfNull(T t, T t2) {
        return Objects.isNull(t) ? t2 : t;
    }

    protected abstract String name();

    protected abstract Function<Flux<ReceiverRecord<K, V>>, Flux<ReceiverRecord<K, V>>> messageHandling();

    private Status updateStatus(Status status) {
        this.innerStatus.set(status);
        return status;
    }

    public Status status() {
        return this.innerStatus.get();
    }

    public Status start() {
        Status status = status();
        if (Status.Starting.equals(status) || Status.Started.equals(status)) {
            LOGGER.info("{} already started", name());
            return status;
        }
        updateStatus(Status.Starting);
        this.onStarting.get().subscribe();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.cdlRef.set(countDownLatch);
        LOGGER.info("Starting {} on topic '{}' with group id '{}'", new Object[]{name(), this.topics, this.groupId});
        AtomicInteger atomicInteger = new AtomicInteger(0);
        KafkaReceiver<K, V> create = KafkaReceiver.create(this.receiverOptions);
        this.consumerRef.set(create);
        ConnectableFlux publish = create.receive().doOnSubscribe(subscription -> {
            onStart(atomicInteger);
        }).groupBy(receiverRecord -> {
            return receiverRecord.receiverOffset().topicPartition();
        }).flatMap(groupedFlux -> {
            return groupedFlux.transform(messageHandling()).doOnNext(receiverRecord2 -> {
                receiverRecord2.receiverOffset().acknowledge();
            });
        }).retryWhen(Retry.backoff(this.maxRestarts.longValue(), this.minBackoff).maxBackoff(this.maxBackoff).onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
            return retrySignal.failure();
        }).doBeforeRetry(retrySignal2 -> {
            int incrementAndGet = atomicInteger.incrementAndGet();
            if (incrementAndGet > 0) {
                LOGGER.info("Stream for %s is restarting for the %s time".formatted(name(), Integer.valueOf(incrementAndGet)), retrySignal2.failure());
            } else {
                LOGGER.info("Stream for {} is starting", name());
            }
        })).publish();
        publish.subscribe(receiverRecord2 -> {
        }, th -> {
            onError(th);
            countDownLatch.countDown();
        }, () -> {
            onComplete();
            countDownLatch.countDown();
        });
        this.disposableKafkaRef.set(publish.connect());
        return status();
    }

    private void onStart(AtomicInteger atomicInteger) {
        LOGGER.info("Starting {}", name());
        updateStatus(Status.Started);
        this.onStarted.apply(this.disposableKafkaRef.get(), Integer.valueOf(atomicInteger.get())).subscribe();
    }

    private void onError(Throwable th) {
        if (th instanceof CancellationException) {
            onComplete();
            return;
        }
        LOGGER.error("Error during " + name(), th);
        updateStatus(Status.Failed);
        this.onFailed.apply(th).subscribe();
    }

    private void onComplete() {
        LOGGER.info("{} is stopped", name());
        updateStatus(Status.Stopped);
        this.onStopped.get().subscribe();
    }

    public Mono<Void> stop() {
        return Mono.create(monoSink -> {
            LOGGER.info("Stopping {}", name());
            updateStatus(Status.Stopping);
            this.onStopping.get().subscribe();
            Optional.ofNullable(this.disposableKafkaRef.get()).ifPresent(disposable -> {
                if (disposable.isDisposed()) {
                    return;
                }
                disposable.dispose();
            });
            Optional.ofNullable(this.cdlRef.get()).ifPresent(countDownLatch -> {
                try {
                    countDownLatch.await();
                    monoSink.success();
                } catch (InterruptedException e) {
                    monoSink.error(e);
                }
            });
        });
    }
}
