package datadog.trace.instrumentation.lettuce.rx;

import datadog.slf4j.LoggerFactory;
import datadog.trace.instrumentation.api.AgentSpan;
import datadog.trace.instrumentation.api.AgentTracer;
import datadog.trace.instrumentation.lettuce.LettuceClientDecorator;
import io.lettuce.core.protocol.RedisCommand;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Signal;
import reactor.core.publisher.SignalType;

/* loaded from: input_file:agent-tooling-and-instrumentation.isolated/datadog/trace/instrumentation/lettuce/rx/LettuceFluxTerminationRunnable.classdata */
public class LettuceFluxTerminationRunnable implements Consumer<Signal>, Runnable {
    private AgentSpan span = null;
    private int numResults = 0;
    private FluxOnSubscribeConsumer onSubscribeConsumer;

    /* loaded from: input_file:agent-tooling-and-instrumentation.isolated/datadog/trace/instrumentation/lettuce/rx/LettuceFluxTerminationRunnable$FluxOnSubscribeConsumer.classdata */
    public static class FluxOnSubscribeConsumer implements Consumer<Subscription> {
        private final LettuceFluxTerminationRunnable owner;
        private final RedisCommand command;
        private final boolean finishSpanOnClose;

        public FluxOnSubscribeConsumer(LettuceFluxTerminationRunnable lettuceFluxTerminationRunnable, RedisCommand redisCommand, boolean z) {
            this.owner = lettuceFluxTerminationRunnable;
            this.command = redisCommand;
            this.finishSpanOnClose = z;
        }

        @Override // java.util.function.Consumer
        public void accept(Subscription subscription) {
            AgentSpan startSpan = AgentTracer.startSpan("redis.query");
            this.owner.span = startSpan;
            LettuceClientDecorator.DECORATE.afterStart(startSpan);
            LettuceClientDecorator.DECORATE.onCommand(startSpan, this.command);
            if (this.finishSpanOnClose) {
                LettuceClientDecorator.DECORATE.beforeFinish(startSpan);
                startSpan.finish();
            }
        }
    }

    public LettuceFluxTerminationRunnable(RedisCommand redisCommand, boolean z) {
        this.onSubscribeConsumer = null;
        this.onSubscribeConsumer = new FluxOnSubscribeConsumer(this, redisCommand, z);
    }

    public FluxOnSubscribeConsumer getOnSubscribeConsumer() {
        return this.onSubscribeConsumer;
    }

    private void finishSpan(boolean z, Throwable th) {
        if (this.span == null) {
            LoggerFactory.getLogger((Class<?>) Flux.class).error("Failed to finish this.span, LettuceFluxTerminationRunnable cannot find this.span because it probably wasn't started.");
            return;
        }
        this.span.setTag("db.command.results.count", this.numResults);
        if (z) {
            this.span.setTag("db.command.cancelled", true);
        }
        LettuceClientDecorator.DECORATE.onError(this.span, th);
        LettuceClientDecorator.DECORATE.beforeFinish(this.span);
        this.span.finish();
    }

    @Override // java.util.function.Consumer
    public void accept(Signal signal) {
        if (SignalType.ON_COMPLETE.equals(signal.getType()) || SignalType.ON_ERROR.equals(signal.getType())) {
            finishSpan(false, signal.getThrowable());
        } else if (SignalType.ON_NEXT.equals(signal.getType())) {
            this.numResults++;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.span != null) {
            finishSpan(true, null);
        } else {
            LoggerFactory.getLogger((Class<?>) Flux.class).error("Failed to finish this.span to indicate cancellation, LettuceFluxTerminationRunnable cannot find this.span because it probably wasn't started.");
        }
    }
}
