package co.touchlab.skie.phases.runtime;

import co.touchlab.skie.phases.SirPhase;
import kotlin.Metadata;
import kotlin.jvm.internal.Intrinsics;
import org.jetbrains.annotations.NotNull;

/* compiled from: FlowCombineConversionGenerator.kt */
@Metadata(mv = {2, 0, 0}, k = 1, xi = 48, d1 = {"��\u0018\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0003\n\u0002\u0010\u0002\n\u0002\u0018\u0002\n\u0002\b\u0006\bÆ\u0002\u0018��2\u00020\u0001B\t\b\u0002¢\u0006\u0004\b\u0002\u0010\u0003J\u000f\u0010\u0004\u001a\u00020\u0005R\u00020\u0006¢\u0006\u0002\u0010\u0007J\u0011\u0010\b\u001a\u00020\u0005H\u0002R\u00020\u0006¢\u0006\u0002\u0010\u0007J\u0011\u0010\t\u001a\u00020\u0005H\u0002R\u00020\u0006¢\u0006\u0002\u0010\u0007J\u0011\u0010\n\u001a\u00020\u0005H\u0002R\u00020\u0006¢\u0006\u0002\u0010\u0007J\u0011\u0010\u000b\u001a\u00020\u0005H\u0002R\u00020\u0006¢\u0006\u0002\u0010\u0007¨\u0006\f"}, d2 = {"Lco/touchlab/skie/phases/runtime/FlowCombineConversionGenerator;", "", "<init>", "()V", "generate", "", "Lco/touchlab/skie/phases/SirPhase$Context;", "(Lco/touchlab/skie/phases/SirPhase$Context;)V", "generateSkieCombineSubscription", "generateSkieFlowPublisher", "generateToPublisherExtension", "generatePublisherSinkExtension", "kotlin-compiler-core"})
/* loaded from: input_file:co/touchlab/skie/phases/runtime/FlowCombineConversionGenerator.class */
public final class FlowCombineConversionGenerator {

    @NotNull
    public static final FlowCombineConversionGenerator INSTANCE = new FlowCombineConversionGenerator();

    private FlowCombineConversionGenerator() {
    }

    public final void generate(@NotNull SirPhase.Context context) {
        Intrinsics.checkNotNullParameter(context, "$context_receiver_0");
        generateSkieCombineSubscription(context);
        generateSkieFlowPublisher(context);
        generateToPublisherExtension(context);
        generatePublisherSinkExtension(context);
    }

    private final void generateSkieCombineSubscription(SirPhase.Context context) {
        context.getNamespaceProvider().getSkieNamespaceWrittenSourceFile("SkieCombineSubscription").setContent("import Combine\n\ninternal final actor SkieCombineSubscription<F: SkieSwiftFlowProtocol<S.Input>, S: Combine.Subscriber>: Combine.Subscription where S.Failure == _Concurrency.CancellationError {\n    private var currentDemand: Combine.Subscribers.Demand = .none\n    private var onDemandUpdated: _Concurrency.CheckedContinuation<Swift.Void, Swift.Never>? = nil\n    private var collectingTask: _Concurrency.Task<Swift.Void, Swift.Never>?\n\n    init(flow: F, subscriber: S) {\n        _Concurrency.Task {\n            await self.startCollectingTask(flow: flow, subscriber: subscriber)\n        }\n    }\n\n    nonisolated func request(_ demand: Combine.Subscribers.Demand) {\n        _Concurrency.Task {\n            await add(demand: demand)\n        }\n    }\n\n    nonisolated func cancel() {\n        _Concurrency.Task {\n            _ = await collectingTask?.cancel()\n        }\n    }\n\n    private func startCollectingTask(flow: F, subscriber: S) {\n        collectingTask = _Concurrency.Task {\n            await _Concurrency.withTaskCancellationHandler {\n                await waitForDemand()\n\n                // If we get cancelled, tell subscriber we failed with CancellationError\n                guard !_Concurrency.Task.isCancelled else {\n                    subscriber.receive(completion: .failure(_Concurrency.CancellationError()))\n                    return\n                }\n\n                do {\n                    for try await element in flow {\n                        let newDemand = subscriber.receive(element)\n                        add(demand: newDemand)\n                        await waitForDemand()\n                    }\n                } catch {\n                    Swift.fatalError(\"Collecting a SKIE flow threw an error. This isn't expected to happen and is a bug in SKIE. Error: \\(error)\")\n                }\n\n                // If we get cancelled, tell subscriber we failed with CancellationError\n                guard !_Concurrency.Task.isCancelled else {\n                    subscriber.receive(completion: .failure(_Concurrency.CancellationError()))\n                    return\n                }\n\n                // We should only get here if the underlying flow has completed\n                subscriber.receive(completion: .finished)\n            } onCancel: { [weak self] in\n                guard let self else { return }\n                _Concurrency.Task.detached {\n                    await self.notifyDemandUpdated()\n                }\n            }\n        }\n    }\n\n    private func waitForDemand() async {\n        // If we're cancelled, we'll just return.\n        while !_Concurrency.Task.isCancelled {\n            // If there's still demand, we decrement it and return right away.\n            if currentDemand > 0 {\n                currentDemand -= 1\n                return\n            }\n\n            // Otherwise we store continuation and wait for it to be invoked.\n            await _Concurrency.withCheckedContinuation { continuation in\n                onDemandUpdated = continuation\n            }\n\n            // Once  we get notified, we the next loop will decrement the demand, or keep waiting if there's no demand\n        }\n    }\n\n    private func add(demand: Combine.Subscribers.Demand) {\n        currentDemand += demand\n        notifyDemandUpdated()\n    }\n\n    private func notifyDemandUpdated() {\n        let onDemandUpdated = self.onDemandUpdated\n        self.onDemandUpdated = nil\n        onDemandUpdated?.resume()\n    }\n\n    deinit {\n        /*\n         * We shouldn't need this, as the Task strongly references this instance,\n         * so it either doesn't exist anymore (is canceled), or this deinit can't be called.\n         */\n        onDemandUpdated?.resume()\n    }\n}");
    }

    private final void generateSkieFlowPublisher(SirPhase.Context context) {
        context.getNamespaceProvider().getSkieNamespaceWrittenSourceFile("SkieFlowPublisher").setContent("import Combine\n\ninternal struct SkieFlowPublisher<Output, F: SkieSwiftFlowProtocol<Output>>: Combine.Publisher {\n    typealias Failure = _Concurrency.CancellationError\n\n    let flow: F\n\n    func receive<S>(\n        subscriber: S\n    ) where S : Combine.Subscriber, _Concurrency.CancellationError == S.Failure, Output == S.Input {\n        let subscription = SkieCombineSubscription(flow: flow, subscriber: subscriber)\n        subscriber.receive(subscription: subscription)\n    }\n}");
    }

    private final void generateToPublisherExtension(SirPhase.Context context) {
        context.getNamespaceProvider().getSkieNamespaceWrittenSourceFile("SkieSwiftFlowProtocol+toPublisher").setContent("import Combine\n\nextension SkieSwiftFlowProtocol {\n    /**\n     Returns a Published from this Flow. This publisher can fail with a ``CancellationError`` when the underlying flow is cancelled from Kotlin.\n\n     - Returns: A publisher instance, which you can use Combine operators with. It's cold and won't start collecting the backing flow until a subscriber is attached.\n    */\n    public func toPublisher() -> some Combine.Publisher<Element, _Concurrency.CancellationError> {\n        SkieFlowPublisher(flow: self)\n    }\n}");
    }

    private final void generatePublisherSinkExtension(SirPhase.Context context) {
        context.getNamespaceProvider().getSkieNamespaceWrittenSourceFile("Combine.Publisher+sink").setContent("import Combine\n\nextension Combine.Publisher where Self.Failure == _Concurrency.CancellationError {\n\n    /**\n     Attaches a subscriber with closure-based behavior to a publisher that fails with ``CancellationError``.\n\n     Use ``Publisher/sink(receiveValue:)`` to observe values received by the publisher and print them to the console.\n     This operator is meant to be used mainly with SKIE Flows, which can fail with ``CancellationError``.\n     That means the publisher’s ``Publisher/Failure`` type is ``CancellationError``.\n\n     This method creates the subscriber and immediately requests an unlimited number of values, prior to returning the subscriber.\n     The return value should be held, otherwise the stream will be canceled.\n\n     - parameter receiveValue: The closure to execute on receipt of a value.\n     - Returns: A cancellable instance, which you use when you end assignment of the received value. Deallocation of the result will tear down the subscription stream.\n    */\n    public func sink(receiveValue: @escaping ((Self.Output) -> Swift.Void)) -> Combine.AnyCancellable {\n        self.sink(receiveCompletion: { _ in }, receiveValue: receiveValue)\n    }\n}");
    }
}
