/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.rpc.metrics.om;

import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import io.micrometer.core.instrument.MeterRegistry;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.rsocket.Payload;
import io.rsocket.internal.SwitchTransformFlux;
import io.rsocket.rpc.AbstractRSocketService;
import io.rsocket.rpc.annotations.internal.Generated;
import io.rsocket.rpc.annotations.internal.ResourceType;
import io.rsocket.rpc.frames.Metadata;
import io.rsocket.rpc.metrics.Metrics;
import io.rsocket.rpc.metrics.om.MetricsSnapshot;
import io.rsocket.rpc.metrics.om.MetricsSnapshotHandler;
import io.rsocket.rpc.tracing.Tag;
import io.rsocket.rpc.tracing.Tracing;
import io.rsocket.util.ByteBufPayload;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.inject.Inject;
import javax.inject.Named;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Generated(type=ResourceType.SERVICE, idlClass=MetricsSnapshotHandler.class)
@Named(value="MetricsSnapshotHandlerServer")
public final class MetricsSnapshotHandlerServer
extends AbstractRSocketService {
    private final MetricsSnapshotHandler service;
    private final Tracer tracer;
    private final Function<? super Publisher<Payload>, ? extends Publisher<Payload>> streamMetrics;
    private final Function<SpanContext, Function<? super Publisher<Payload>, ? extends Publisher<Payload>>> streamMetricsTrace;
    private static final Function<MessageLite, Payload> serializer = new Function<MessageLite, Payload>(){

        @Override
        public Payload apply(MessageLite message) {
            int length = message.getSerializedSize();
            ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(length);
            try {
                message.writeTo(CodedOutputStream.newInstance((ByteBuffer)byteBuf.internalNioBuffer(0, length)));
                byteBuf.writerIndex(length);
                return ByteBufPayload.create((ByteBuf)byteBuf);
            }
            catch (Throwable t) {
                byteBuf.release();
                throw new RuntimeException(t);
            }
        }
    };

    @Inject
    public MetricsSnapshotHandlerServer(MetricsSnapshotHandler service, Optional<MeterRegistry> registry, Optional<Tracer> tracer) {
        this.service = service;
        this.streamMetrics = !registry.isPresent() ? Function.identity() : Metrics.timed(registry.get(), "rsocket.server", "service", "io.rsocket.rpc.metrics.om.MetricsSnapshotHandler", "method", "StreamMetrics");
        if (!tracer.isPresent()) {
            this.tracer = null;
            this.streamMetricsTrace = Tracing.traceAsChild();
        } else {
            this.tracer = tracer.get();
            this.streamMetricsTrace = Tracing.traceAsChild(this.tracer, "StreamMetrics", Tag.of("rsocket.service", "io.rsocket.rpc.metrics.om.MetricsSnapshotHandler"), Tag.of("rsocket.rpc.role", "server"), Tag.of("rsocket.rpc.version", ""));
        }
    }

    @Override
    public String getService() {
        return "io.rsocket.rpc.metrics.om.MetricsSnapshotHandler";
    }

    @Override
    public Class<?> getServiceClass() {
        return this.service.getClass();
    }

    public Mono<Void> fireAndForget(Payload payload) {
        return Mono.error((Throwable)new UnsupportedOperationException("Fire and forget not implemented."));
    }

    public Mono<Payload> requestResponse(Payload payload) {
        return Mono.error((Throwable)new UnsupportedOperationException("Request-Response not implemented."));
    }

    public Flux<Payload> requestStream(Payload payload) {
        return Flux.error((Throwable)new UnsupportedOperationException("Request-Stream not implemented."));
    }

    @Override
    public Flux<Payload> requestChannel(Payload payload, Flux<Payload> publisher) {
        try {
            ByteBuf metadata = payload.sliceMetadata();
            SpanContext spanContext = Tracing.deserializeTracingMetadata(this.tracer, metadata);
            switch (Metadata.getMethod(metadata)) {
                case "StreamMetrics": {
                    Flux messages = publisher.map(MetricsSnapshotHandlerServer.deserializer(MetricsSnapshot.parser()));
                    return this.service.streamMetrics((Publisher<MetricsSnapshot>)messages, metadata).map(serializer).transform(this.streamMetrics).transform(this.streamMetricsTrace.apply(spanContext));
                }
            }
            return Flux.error((Throwable)new UnsupportedOperationException());
        }
        catch (Throwable t) {
            return Flux.error((Throwable)t);
        }
    }

    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        return new SwitchTransformFlux(payloads, (BiFunction)new BiFunction<Payload, Flux<Payload>, Publisher<? extends Payload>>(){

            @Override
            public Publisher<Payload> apply(Payload payload, Flux<Payload> publisher) {
                return MetricsSnapshotHandlerServer.this.requestChannel(payload, publisher);
            }
        });
    }

    private static <T> Function<Payload, T> deserializer(final Parser<T> parser) {
        return new Function<Payload, T>(){

            @Override
            public T apply(Payload payload) {
                try {
                    CodedInputStream is = CodedInputStream.newInstance((ByteBuffer)payload.getData());
                    Object object = parser.parseFrom(is);
                    return object;
                }
                catch (Throwable t) {
                    throw new RuntimeException(t);
                }
                finally {
                    payload.release();
                }
            }
        };
    }
}

