package tech.illuin.pipeline.execution.phase.impl;

import io.micrometer.core.instrument.MeterRegistry;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.illuin.pipeline.Pipeline;
import tech.illuin.pipeline.context.ComponentContext;
import tech.illuin.pipeline.context.Context;
import tech.illuin.pipeline.execution.phase.PipelinePhase;
import tech.illuin.pipeline.execution.phase.PipelineStrategy;
import tech.illuin.pipeline.input.uid_generator.UIDGenerator;
import tech.illuin.pipeline.metering.PipelineSinkMetrics;
import tech.illuin.pipeline.metering.marker.LogMarker;
import tech.illuin.pipeline.metering.tag.MetricTags;
import tech.illuin.pipeline.output.ComponentFamily;
import tech.illuin.pipeline.output.ComponentTag;
import tech.illuin.pipeline.output.Output;
import tech.illuin.pipeline.output.PipelineTag;
import tech.illuin.pipeline.sink.builder.SinkDescriptor;

/* loaded from: input_file:tech/illuin/pipeline/execution/phase/impl/SinkPhase.class */
public class SinkPhase<I> implements PipelinePhase<I> {
    private final Pipeline<I> pipeline;
    private final List<SinkDescriptor> sinks;
    private final ExecutorService sinkExecutor;
    private final int closeTimeout;
    private final UIDGenerator uidGenerator;
    private final MeterRegistry meterRegistry;
    private static final Logger logger = LoggerFactory.getLogger(SinkPhase.class);

    public SinkPhase(Pipeline<I> pipeline, List<SinkDescriptor> list, Supplier<ExecutorService> supplier, int i, UIDGenerator uIDGenerator, MeterRegistry meterRegistry) {
        this.pipeline = pipeline;
        this.sinks = list;
        this.sinkExecutor = initExecutor(supplier);
        this.closeTimeout = i;
        this.uidGenerator = uIDGenerator;
        this.meterRegistry = meterRegistry;
    }

    @Override // tech.illuin.pipeline.execution.phase.PipelinePhase
    public PipelineStrategy run(I i, Output output, Context context, MetricTags metricTags) throws Exception {
        for (SinkDescriptor sinkDescriptor : this.sinks) {
            ComponentTag createTag = createTag(output.tag(), sinkDescriptor);
            PipelineSinkMetrics pipelineSinkMetrics = new PipelineSinkMetrics(this.meterRegistry, createTag, metricTags);
            if (sinkDescriptor.isAsync()) {
                runSinkAsynchronously(sinkDescriptor, createTag, i, output, context, pipelineSinkMetrics);
            } else {
                runSinkSynchronously(sinkDescriptor, createTag, i, output, context, pipelineSinkMetrics);
            }
        }
        return PipelineStrategy.CONTINUE;
    }

    private void runSinkSynchronously(SinkDescriptor sinkDescriptor, ComponentTag componentTag, I i, Output output, Context context, PipelineSinkMetrics pipelineSinkMetrics) throws Exception {
        ComponentContext wrapContext = wrapContext(i, context, componentTag.pipelineTag(), componentTag, pipelineSinkMetrics);
        long nanoTime = System.nanoTime();
        try {
            try {
                logger.trace(pipelineSinkMetrics.mark(), "{}#{} launching sink {}", new Object[]{componentTag.pipelineTag().pipeline(), componentTag.pipelineTag().uid(), componentTag.id()});
                sinkDescriptor.execute(output, wrapContext);
                pipelineSinkMetrics.successCounter().increment();
                pipelineSinkMetrics.runTimer().record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                pipelineSinkMetrics.totalCounter().increment();
            } catch (Exception e) {
                logger.error(pipelineSinkMetrics.mark(e), "{}#{} sink {} threw an {}: {}", new Object[]{componentTag.pipelineTag().pipeline(), componentTag.pipelineTag().uid(), componentTag.id(), e.getClass().getName(), e.getMessage()});
                pipelineSinkMetrics.failureCounter().increment();
                pipelineSinkMetrics.errorCounter(e).increment();
                sinkDescriptor.handleException(e, output, context);
                pipelineSinkMetrics.runTimer().record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                pipelineSinkMetrics.totalCounter().increment();
            }
        } catch (Throwable th) {
            pipelineSinkMetrics.runTimer().record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            pipelineSinkMetrics.totalCounter().increment();
            throw th;
        }
    }

    private void runSinkAsynchronously(SinkDescriptor sinkDescriptor, ComponentTag componentTag, I i, Output output, Context context, PipelineSinkMetrics pipelineSinkMetrics) {
        if (this.sinkExecutor == null) {
            throw new IllegalStateException("An asynchronous run has been initiated but there is no active executor");
        }
        ComponentContext wrapContext = wrapContext(i, context, componentTag.pipelineTag(), componentTag, pipelineSinkMetrics);
        logger.trace(pipelineSinkMetrics.mark(), "{}#{} queuing sink {}", new Object[]{componentTag.pipelineTag().pipeline(), componentTag.pipelineTag().uid(), componentTag.id()});
        CompletableFuture.runAsync(() -> {
            long nanoTime = System.nanoTime();
            try {
                try {
                    logger.trace(pipelineSinkMetrics.mark(), "{}#{} launching sink {}", new Object[]{componentTag.pipelineTag().pipeline(), componentTag.pipelineTag().uid(), componentTag.id()});
                    sinkDescriptor.execute(output, wrapContext);
                    pipelineSinkMetrics.successCounter().increment();
                    pipelineSinkMetrics.runTimer().record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                    pipelineSinkMetrics.totalCounter().increment();
                } catch (Exception e) {
                    logger.error(pipelineSinkMetrics.mark(e), "{}#{} sink {} threw an {}: {}", new Object[]{componentTag.pipelineTag().pipeline(), componentTag.pipelineTag().uid(), componentTag.id(), e.getClass().getName(), e.getMessage()});
                    pipelineSinkMetrics.failureCounter().increment();
                    pipelineSinkMetrics.errorCounter(e).increment();
                    sinkDescriptor.handleExceptionThenSwallow(e, output, wrapContext);
                    pipelineSinkMetrics.runTimer().record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                    pipelineSinkMetrics.totalCounter().increment();
                }
            } catch (Throwable th) {
                pipelineSinkMetrics.runTimer().record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                pipelineSinkMetrics.totalCounter().increment();
                throw th;
            }
        }, this.sinkExecutor);
    }

    private ComponentTag createTag(PipelineTag pipelineTag, SinkDescriptor sinkDescriptor) {
        return new ComponentTag(this.uidGenerator.generate(), pipelineTag, sinkDescriptor.id(), ComponentFamily.SINK);
    }

    private ComponentContext wrapContext(I i, Context context, PipelineTag pipelineTag, ComponentTag componentTag, LogMarker logMarker) {
        return new ComponentContext(context, i, pipelineTag, componentTag, this.uidGenerator, logMarker);
    }

    private ExecutorService initExecutor(Supplier<ExecutorService> supplier) {
        if (this.sinks.stream().anyMatch((v0) -> {
            return v0.isAsync();
        })) {
            return supplier.get();
        }
        return null;
    }

    @Override // tech.illuin.pipeline.execution.phase.PipelinePhase, java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.sinkExecutor == null || this.sinkExecutor.isShutdown()) {
            return;
        }
        this.sinkExecutor.shutdown();
        logger.info("{} closed (executor termination status: {})", this.pipeline.id(), this.sinkExecutor.awaitTermination((long) this.closeTimeout, TimeUnit.SECONDS) ? "done" : "timeout after " + this.closeTimeout + " seconds");
    }
}
