package reactor.core.publisher;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:WEB-INF/lib/reactor-core-3.2.10.RELEASE.jar:reactor/core/publisher/FluxMetrics.class */
public final class FluxMetrics<T> extends FluxOperator<T, T> {
    private static final Logger LOGGER = Loggers.getLogger((Class<?>) FluxMetrics.class);
    static final String REACTOR_DEFAULT_NAME = "reactor";
    static final String METER_MALFORMED = "reactor.malformed.source";
    static final String METER_SUBSCRIBED = "reactor.subscribed";
    static final String METER_FLOW_DURATION = "reactor.flow.duration";
    static final String METER_ON_NEXT_DELAY = "reactor.onNext.delay";
    static final String METER_REQUESTED = "reactor.requested";
    static final String TAG_STATUS = "status";
    static final String TAG_EXCEPTION = "exception";
    static final String TAG_SEQUENCE_NAME = "flow";
    static final String TAG_SEQUENCE_TYPE = "type";
    static final String TAGVALUE_ON_ERROR = "error";
    static final String TAGVALUE_ON_COMPLETE = "completed";
    static final String TAGVALUE_CANCEL = "cancelled";
    static final String TAGVALUE_FLUX = "Flux";
    static final String TAGVALUE_MONO = "Mono";
    final String name;
    final List<Tag> tags;
    final MeterRegistry registryCandidate;

    /* loaded from: input_file:WEB-INF/lib/reactor-core-3.2.10.RELEASE.jar:reactor/core/publisher/FluxMetrics$MicrometerFluxMetricsFuseableSubscriber.class */
    static final class MicrometerFluxMetricsFuseableSubscriber<T> extends MicrometerFluxMetricsSubscriber<T> implements Fuseable, Fuseable.QueueSubscription<T> {
        private int fusionMode;

        /* JADX INFO: Access modifiers changed from: package-private */
        public MicrometerFluxMetricsFuseableSubscriber(CoreSubscriber<? super T> coreSubscriber, MeterRegistry meterRegistry, Clock clock, String str, List<Tag> list) {
            super(coreSubscriber, meterRegistry, clock, str, list);
        }

        @Override // reactor.core.publisher.FluxMetrics.MicrometerFluxMetricsSubscriber, org.reactivestreams.Subscriber
        public void onNext(T t) {
            if (this.fusionMode == 2) {
                this.actual.onNext(null);
                return;
            }
            if (this.done) {
                this.malformedSourceCounter.increment();
                Operators.onNextDropped(t, this.actual.currentContext());
            } else {
                long j = this.lastNextEventNanos;
                this.lastNextEventNanos = this.clock.monotonicTime();
                this.onNextIntervalTimer.record(this.lastNextEventNanos - j, TimeUnit.NANOSECONDS);
                this.actual.onNext(t);
            }
        }

        @Override // reactor.core.Fuseable.QueueSubscription
        public int requestFusion(int i) {
            if (this.qs == null) {
                return 0;
            }
            this.fusionMode = this.qs.requestFusion(i);
            return this.fusionMode;
        }

        @Override // java.util.Queue
        @Nullable
        public T poll() {
            if (this.qs == null) {
                return null;
            }
            try {
                T poll = this.qs.poll();
                if (poll == null && this.fusionMode == 1) {
                    this.subscribeToTerminateSample.stop(this.subscribeToCompleteTimer);
                }
                if (poll != null) {
                    long j = this.lastNextEventNanos;
                    this.lastNextEventNanos = this.clock.monotonicTime();
                    this.onNextIntervalTimer.record(this.lastNextEventNanos - j, TimeUnit.NANOSECONDS);
                }
                return poll;
            } catch (Throwable th) {
                this.subscribeToTerminateSample.stop(this.subscribeToErrorTimerFactory.apply(th));
                throw th;
            }
        }

        @Override // java.util.Collection
        public void clear() {
            if (this.qs != null) {
                this.qs.clear();
            }
        }

        @Override // java.util.Collection
        public boolean isEmpty() {
            return this.qs == null || this.qs.isEmpty();
        }

        @Override // java.util.Collection
        public int size() {
            if (this.qs == null) {
                return 0;
            }
            return this.qs.size();
        }
    }

    /* loaded from: input_file:WEB-INF/lib/reactor-core-3.2.10.RELEASE.jar:reactor/core/publisher/FluxMetrics$MicrometerFluxMetricsSubscriber.class */
    static class MicrometerFluxMetricsSubscriber<T> implements InnerOperator<T, T> {
        final CoreSubscriber<? super T> actual;
        final Clock clock;
        final Counter malformedSourceCounter;
        final Counter subscribedCounter;
        final DistributionSummary requestedCounter;
        Timer.Sample subscribeToTerminateSample;
        long lastNextEventNanos = -1;
        boolean done;

        @Nullable
        Fuseable.QueueSubscription<T> qs;
        Subscription s;
        final Timer onNextIntervalTimer;
        final Timer subscribeToCompleteTimer;
        final Function<Throwable, Timer> subscribeToErrorTimerFactory;
        final Timer subscribeToCancelTimer;

        MicrometerFluxMetricsSubscriber(CoreSubscriber<? super T> coreSubscriber, MeterRegistry meterRegistry, Clock clock, String str, List<Tag> list) {
            this.actual = coreSubscriber;
            this.clock = clock;
            ArrayList arrayList = new ArrayList();
            arrayList.add(Tag.of(FluxMetrics.TAG_SEQUENCE_NAME, str));
            arrayList.add(Tag.of("type", FluxMetrics.TAGVALUE_FLUX));
            arrayList.addAll(list);
            this.subscribeToCompleteTimer = Timer.builder(FluxMetrics.METER_FLOW_DURATION).tags(arrayList).tag("status", FluxMetrics.TAGVALUE_ON_COMPLETE).tag("exception", "").description("Times the duration elapsed between a subscription and the onComplete termination of the sequence").register(meterRegistry);
            this.subscribeToCancelTimer = Timer.builder(FluxMetrics.METER_FLOW_DURATION).tags(arrayList).tag("status", FluxMetrics.TAGVALUE_CANCEL).tag("exception", "").description("Times the duration elapsed between a subscription and the cancellation of the sequence").register(meterRegistry);
            Timer.Builder description = Timer.builder(FluxMetrics.METER_FLOW_DURATION).tags(arrayList).tag("status", "error").description("Times the duration elapsed between a subscription and the onError termination of the sequence, with the exception name as a tag");
            this.subscribeToErrorTimerFactory = th -> {
                return description.tag("exception", th.getClass().getName()).register(meterRegistry);
            };
            this.onNextIntervalTimer = Timer.builder(FluxMetrics.METER_ON_NEXT_DELAY).tags(arrayList).description("Measures delays between onNext signals (or between onSubscribe and first onNext)").register(meterRegistry);
            this.subscribedCounter = Counter.builder(FluxMetrics.METER_SUBSCRIBED).tags(arrayList).baseUnit("subscribers").description("Counts how many Reactor sequences have been subscribed to").register(meterRegistry);
            this.malformedSourceCounter = meterRegistry.counter(FluxMetrics.METER_MALFORMED, arrayList);
            if (FluxMetrics.REACTOR_DEFAULT_NAME.equals(str)) {
                this.requestedCounter = null;
            } else {
                this.requestedCounter = DistributionSummary.builder(FluxMetrics.METER_REQUESTED).tags(arrayList).description("Counts the amount requested to a named Flux by all subscribers, until at least one requests an unbounded amount").baseUnit("requested amount").register(meterRegistry);
            }
        }

        @Override // reactor.core.publisher.InnerProducer
        public CoreSubscriber<? super T> actual() {
            return this.actual;
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(T t) {
            if (this.done) {
                this.malformedSourceCounter.increment();
                Operators.onNextDropped(t, this.actual.currentContext());
            } else {
                long j = this.lastNextEventNanos;
                this.lastNextEventNanos = this.clock.monotonicTime();
                this.onNextIntervalTimer.record(this.lastNextEventNanos - j, TimeUnit.NANOSECONDS);
                this.actual.onNext(t);
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            if (this.done) {
                this.malformedSourceCounter.increment();
                Operators.onErrorDropped(th, this.actual.currentContext());
            } else {
                this.done = true;
                this.subscribeToTerminateSample.stop(this.subscribeToErrorTimerFactory.apply(th));
                this.actual.onError(th);
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            if (this.done) {
                this.malformedSourceCounter.increment();
                return;
            }
            this.done = true;
            this.subscribeToTerminateSample.stop(this.subscribeToCompleteTimer);
            this.actual.onComplete();
        }

        @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            if (Operators.validate(this.s, subscription)) {
                this.subscribedCounter.increment();
                this.subscribeToTerminateSample = Timer.start(this.clock);
                this.lastNextEventNanos = this.clock.monotonicTime();
                if (subscription instanceof Fuseable.QueueSubscription) {
                    this.qs = (Fuseable.QueueSubscription) subscription;
                }
                this.s = subscription;
                this.actual.onSubscribe(this);
            }
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            if (Operators.validate(j)) {
                if (this.requestedCounter != null) {
                    this.requestedCounter.record(j);
                }
                this.s.request(j);
            }
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            this.subscribeToTerminateSample.stop(this.subscribeToCancelTimer);
            this.s.cancel();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Tuple2<String, List<Tag>> resolveNameAndTags(Publisher<?> publisher) {
        String str;
        List emptyList;
        Scannable from = Scannable.from(publisher);
        if (from.isScanAvailable()) {
            String name = from.name();
            str = from.stepName().equals(name) ? REACTOR_DEFAULT_NAME : name;
            emptyList = (List) from.tags().map(tuple2 -> {
                return Tag.of((String) tuple2.getT1(), (String) tuple2.getT2());
            }).collect(Collectors.toList());
        } else {
            LOGGER.warn("Attempting to activate metrics but the upstream is not Scannable. You might want to use `name()` (and optionally `tags()`) right before `metrics()`");
            str = REACTOR_DEFAULT_NAME;
            emptyList = Collections.emptyList();
        }
        return Tuples.of(str, emptyList);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FluxMetrics(Flux<? extends T> flux) {
        this(flux, null);
    }

    FluxMetrics(Flux<? extends T> flux, @Nullable MeterRegistry meterRegistry) {
        super(flux);
        Tuple2<String, List<Tag>> resolveNameAndTags = resolveNameAndTags(flux);
        this.name = resolveNameAndTags.getT1();
        this.tags = resolveNameAndTags.getT2();
        if (meterRegistry == null) {
            this.registryCandidate = Metrics.globalRegistry;
        } else {
            this.registryCandidate = meterRegistry;
        }
    }

    @Override // reactor.core.publisher.Flux
    public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
        this.source.subscribe((CoreSubscriber<? super Object>) new MicrometerFluxMetricsSubscriber(coreSubscriber, this.registryCandidate, Clock.SYSTEM, this.name, this.tags));
    }
}
