package dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.interceptor.micrometer;

import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.AggregateEventStream;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.PersistedEvent;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.interceptor.EventStoreInterceptor;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.interceptor.EventStoreInterceptorChain;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.operations.AppendToStream;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.operations.FetchStream;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.operations.LoadEvent;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.operations.LoadEventsByGlobalOrder;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.operations.LoadLastPersistedEventRelatedTo;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.persistence.EventMetaData;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.persistence.PersistableEvent;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.persistence.table_per_aggregate_type.PersistableEventEnricher;
import dk.cloudcreate.essentials.shared.FailFast;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.transport.Kind;
import io.micrometer.observation.transport.ReceiverContext;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.propagation.Propagator;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;

/* loaded from: input_file:dk/cloudcreate/essentials/components/eventsourced/eventstore/postgresql/interceptor/micrometer/MicrometerTracingEventStoreInterceptor.class */
public class MicrometerTracingEventStoreInterceptor implements EventStoreInterceptor, PersistableEventEnricher {
    public static final String AGGREGATE_TYPE = "AggregateType";
    public static final String AGGREGATE_ID = "AggregateId";
    public static final String EVENT_TYPE = "EventType";
    public static final String EVENT_ID = "EventId";
    private final Tracer tracer;
    private final Propagator propagator;
    private final ObservationRegistry observationRegistry;
    private final boolean verboseTracing;
    private final ThreadLocal<Observation.Scope> activeObservationScope = new ThreadLocal<>();

    public MicrometerTracingEventStoreInterceptor(Tracer tracer, Propagator propagator, ObservationRegistry observationRegistry, boolean z) {
        this.tracer = (Tracer) FailFast.requireNonNull(tracer, "No tracer instance provided");
        this.propagator = (Propagator) FailFast.requireNonNull(propagator, "No propagator instance provided");
        this.observationRegistry = (ObservationRegistry) FailFast.requireNonNull(observationRegistry, "No observationRegistry instance provided");
        this.verboseTracing = z;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.persistence.table_per_aggregate_type.PersistableEventEnricher
    public PersistableEvent enrich(PersistableEvent persistableEvent) {
        storeTraceContext(persistableEvent.metaData());
        return persistableEvent;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.interceptor.EventStoreInterceptor
    public <ID> AggregateEventStream<ID> intercept(AppendToStream<ID> appendToStream, EventStoreInterceptorChain<AppendToStream<ID>, AggregateEventStream<ID>> eventStoreInterceptorChain) {
        Observation highCardinalityKeyValue = Observation.createNotStarted("PersistEvents:" + appendToStream.aggregateType.toString(), this.observationRegistry).lowCardinalityKeyValue(AGGREGATE_TYPE, appendToStream.aggregateType.toString()).highCardinalityKeyValue(AGGREGATE_ID, appendToStream.aggregateId.toString()).highCardinalityKeyValue("EventCount", Integer.toString(appendToStream.getEventsToAppend().size()));
        Objects.requireNonNull(eventStoreInterceptorChain);
        return (AggregateEventStream) highCardinalityKeyValue.observe(eventStoreInterceptorChain::proceed);
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.interceptor.EventStoreInterceptor
    public <ID> Optional<PersistedEvent> intercept(LoadLastPersistedEventRelatedTo<ID> loadLastPersistedEventRelatedTo, EventStoreInterceptorChain<LoadLastPersistedEventRelatedTo<ID>, Optional<PersistedEvent>> eventStoreInterceptorChain) {
        if (!this.verboseTracing) {
            return eventStoreInterceptorChain.proceed();
        }
        Observation highCardinalityKeyValue = Observation.createNotStarted("LoadLastPersistedEventRelatedTo:" + loadLastPersistedEventRelatedTo.aggregateType.toString(), this.observationRegistry).lowCardinalityKeyValue(AGGREGATE_TYPE, loadLastPersistedEventRelatedTo.aggregateType.toString()).highCardinalityKeyValue(AGGREGATE_ID, loadLastPersistedEventRelatedTo.aggregateId.toString());
        Objects.requireNonNull(eventStoreInterceptorChain);
        return (Optional) highCardinalityKeyValue.observe(eventStoreInterceptorChain::proceed);
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.interceptor.EventStoreInterceptor
    public Optional<PersistedEvent> intercept(LoadEvent loadEvent, EventStoreInterceptorChain<LoadEvent, Optional<PersistedEvent>> eventStoreInterceptorChain) {
        if (!this.verboseTracing) {
            return eventStoreInterceptorChain.proceed();
        }
        Observation highCardinalityKeyValue = Observation.createNotStarted("LoadEvent:" + loadEvent.aggregateType.toString(), this.observationRegistry).lowCardinalityKeyValue(AGGREGATE_TYPE, loadEvent.aggregateType.toString()).highCardinalityKeyValue(EVENT_ID, loadEvent.eventId.toString());
        Objects.requireNonNull(eventStoreInterceptorChain);
        return (Optional) highCardinalityKeyValue.observe(eventStoreInterceptorChain::proceed);
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.interceptor.EventStoreInterceptor
    public <ID> Optional<AggregateEventStream<ID>> intercept(FetchStream<ID> fetchStream, EventStoreInterceptorChain<FetchStream<ID>, Optional<AggregateEventStream<ID>>> eventStoreInterceptorChain) {
        Observation highCardinalityKeyValue = Observation.createNotStarted("FetchStream:" + fetchStream.aggregateType.toString(), this.observationRegistry).lowCardinalityKeyValue(AGGREGATE_TYPE, fetchStream.aggregateType.toString()).highCardinalityKeyValue(AGGREGATE_ID, fetchStream.aggregateId.toString()).highCardinalityKeyValue("EventOrderRange", fetchStream.getEventOrderRange().toString());
        Objects.requireNonNull(eventStoreInterceptorChain);
        return (Optional) highCardinalityKeyValue.observe(eventStoreInterceptorChain::proceed);
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.interceptor.EventStoreInterceptor
    public Stream<PersistedEvent> intercept(LoadEventsByGlobalOrder loadEventsByGlobalOrder, EventStoreInterceptorChain<LoadEventsByGlobalOrder, Stream<PersistedEvent>> eventStoreInterceptorChain) {
        Observation highCardinalityKeyValue = Observation.createNotStarted("FetchStream:" + loadEventsByGlobalOrder.aggregateType.toString(), this.observationRegistry).lowCardinalityKeyValue(AGGREGATE_TYPE, loadEventsByGlobalOrder.aggregateType.toString()).highCardinalityKeyValue("GlobalEventOrderRange", loadEventsByGlobalOrder.getGlobalEventOrderRange().toString()).highCardinalityKeyValue("IncludeAdditionalGlobalOrders", loadEventsByGlobalOrder.getIncludeAdditionalGlobalOrders().toString());
        Objects.requireNonNull(eventStoreInterceptorChain);
        return (Stream) highCardinalityKeyValue.observe(eventStoreInterceptorChain::proceed);
    }

    protected void storeTraceContext(EventMetaData eventMetaData) {
        CurrentTraceContext currentTraceContext;
        if (eventMetaData == null || (currentTraceContext = this.tracer.currentTraceContext()) == null || currentTraceContext.context() == null) {
            return;
        }
        this.propagator.inject(currentTraceContext.context(), eventMetaData, (v0, v1, v2) -> {
            v0.put(v1, v2);
        });
    }

    protected PersistedEvent restoreTraceContext(PersistedEvent persistedEvent, String str) {
        FailFast.requireNonNull(persistedEvent, "No queuedMessage provided");
        FailFast.requireNonNull(str, "No contextDescription provided");
        closeAnyActiveObservationScope();
        Observation start = Observation.start(str + ":" + persistedEvent.aggregateType().toString(), () -> {
            return createTraceContextForEvent(persistedEvent);
        }, this.observationRegistry);
        start.lowCardinalityKeyValue(AGGREGATE_TYPE, persistedEvent.aggregateType().toString()).lowCardinalityKeyValue(EVENT_TYPE, persistedEvent.event().getEventTypeOrNamePersistenceValue()).highCardinalityKeyValue(EVENT_ID, persistedEvent.eventId().toString()).highCardinalityKeyValue(AGGREGATE_ID, persistedEvent.aggregateId().toString());
        this.activeObservationScope.set(start.openScope());
        return persistedEvent;
    }

    private ReceiverContext<EventMetaData> createTraceContextForEvent(PersistedEvent persistedEvent) {
        FailFast.requireNonNull(persistedEvent, "No event provided");
        ReceiverContext<EventMetaData> receiverContext = new ReceiverContext<>((v0, v1) -> {
            return v0.get(v1);
        }, Kind.CONSUMER);
        receiverContext.setCarrier((EventMetaData) persistedEvent.metaData().deserialize());
        return receiverContext;
    }

    private void closeAnyActiveObservationScope() {
        Observation.Scope scope = this.activeObservationScope.get();
        if (scope != null) {
            scope.close();
            scope.getCurrentObservation().stop();
            this.activeObservationScope.remove();
        }
    }
}
