/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.benchmarks.service;

import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.benchmarks.service.BenchmarkService;
import java.util.concurrent.Callable;
import java.util.stream.LongStream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class BenchmarkServiceImpl
implements BenchmarkService {
    private Flux<Long> sharedSource = Flux.fromStream(LongStream.range(0L, Long.MAX_VALUE).boxed()).share();

    @Override
    public Mono<Void> requestVoid(ServiceMessage request) {
        return Mono.empty().subscribeOn(Schedulers.parallel());
    }

    @Override
    public Mono<ServiceMessage> requestOne(ServiceMessage message) {
        Callable<ServiceMessage> callable = () -> {
            long value = System.currentTimeMillis();
            return ServiceMessage.from((ServiceMessage)message).header("service-recv-time", (Object)value).header("service-send-time", (Object)value).build();
        };
        return Mono.fromCallable(callable).subscribeOn(Schedulers.parallel());
    }

    @Override
    public Flux<ServiceMessage> sharedStream(ServiceMessage message) {
        return Flux.defer(() -> this.sharedSource.subscribeOn(Schedulers.parallel()).map(i -> ServiceMessage.from((ServiceMessage)message).header("service-send-time", (Object)System.currentTimeMillis()).build()));
    }

    @Override
    public Flux<ServiceMessage> infiniteStream(ServiceMessage message) {
        return Flux.defer(() -> {
            Callable<ServiceMessage> callable = () -> ServiceMessage.from((ServiceMessage)message).header("service-send-time", (Object)System.currentTimeMillis()).build();
            return Mono.fromCallable(callable).subscribeOn(Schedulers.parallel()).repeat();
        });
    }
}

