package polynote.kernel.task;

import cats.effect.concurrent.Ref;
import fs2.concurrent.SignallingRef$;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import polynote.kernel.DoneStatus;
import polynote.kernel.ErrorStatus$;
import polynote.kernel.KernelStatusUpdate;
import polynote.kernel.Queued$;
import polynote.kernel.Running$;
import polynote.kernel.TaskInfo;
import polynote.kernel.TaskInfo$;
import polynote.kernel.logging.package$Logging$Service;
import polynote.kernel.task.package$TaskManager$Service;
import polynote.kernel.util.Publish;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.Iterable;
import scala.collection.JavaConverters$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.math.Ordering$Long$;
import scala.runtime.BoxedUnit;
import scala.runtime.Nothing$;
import zio.Cause;
import zio.Fiber;
import zio.Has;
import zio.Promise;
import zio.Semaphore;
import zio.ZIO;
import zio.ZIO$;
import zio.ZIO$AccessMPartiallyApplied$;
import zio.ZQueue;
import zio.blocking.package;
import zio.interop.catz$;

/* compiled from: package.scala */
/* loaded from: input_file:polynote/kernel/task/package$TaskManager$Impl.class */
public class package$TaskManager$Impl implements package$TaskManager$Service {
    private final Semaphore queueing;
    public final ZQueue<Object, Nothing$, Object, Nothing$, Tuple2<Promise<Throwable, BoxedUnit>, Promise<Throwable, BoxedUnit>>, Tuple2<Promise<Throwable, BoxedUnit>, Promise<Throwable, BoxedUnit>>> polynote$kernel$task$TaskManager$Impl$$readyQueue;
    public final Fiber<Throwable, Nothing$> polynote$kernel$task$TaskManager$Impl$$process;
    private final AtomicLong polynote$kernel$task$TaskManager$Impl$$taskCounter;
    private final ConcurrentHashMap<String, Tuple3<Ref<ZIO, TaskInfo>, Fiber<Throwable, Object>, Object>> polynote$kernel$task$TaskManager$Impl$$tasks;
    private final Publish<ZIO, TaskInfo> polynote$kernel$task$TaskManager$Impl$$updates;

    @Override // polynote.kernel.task.package$TaskManager$Service
    public <A, R extends Has<?>> ZIO<R, Throwable, ZIO<Object, Throwable, A>> queue_(String str, String str2, String str3, ZIO<R, Throwable, A> zio) {
        return package$TaskManager$Service.Cclass.queue_(this, str, str2, str3, zio);
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public <R extends Has<Ref<ZIO, TaskInfo>>, A, R1 extends Has<?>> Function1<Cause<Throwable>, Function1<TaskInfo, TaskInfo>> queue$default$4() {
        return package$TaskManager$Service.Cclass.queue$default$4(this);
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public <A, R extends Has<?>> String queue_$default$2() {
        return package$TaskManager$Service.Cclass.queue_$default$2(this);
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public <A, R extends Has<?>> String queue_$default$3() {
        return package$TaskManager$Service.Cclass.queue_$default$3(this);
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public <R extends Has<Ref<ZIO, TaskInfo>>, A, R1 extends Has<?>> Function1<Cause<Throwable>, Function1<TaskInfo, TaskInfo>> run$default$4() {
        return package$TaskManager$Service.Cclass.run$default$4(this);
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public <R extends Has<Ref<ZIO, TaskInfo>>, A> String runSubtask$default$2() {
        return package$TaskManager$Service.Cclass.runSubtask$default$2(this);
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public <R extends Has<Ref<ZIO, TaskInfo>>, A> String runSubtask$default$3() {
        return package$TaskManager$Service.Cclass.runSubtask$default$3(this);
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public <R extends Has<Ref<ZIO, TaskInfo>>, A> Function1<Cause<Throwable>, Function1<TaskInfo, TaskInfo>> runSubtask$default$4() {
        return package$TaskManager$Service.Cclass.runSubtask$default$4(this);
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public Option<String> register$default$4() {
        Option<String> option;
        option = None$.MODULE$;
        return option;
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public DoneStatus register$default$5() {
        DoneStatus doneStatus;
        doneStatus = ErrorStatus$.MODULE$;
        return doneStatus;
    }

    public AtomicLong polynote$kernel$task$TaskManager$Impl$$taskCounter() {
        return this.polynote$kernel$task$TaskManager$Impl$$taskCounter;
    }

    public ConcurrentHashMap<String, Tuple3<Ref<ZIO, TaskInfo>, Fiber<Throwable, Object>, Object>> polynote$kernel$task$TaskManager$Impl$$tasks() {
        return this.polynote$kernel$task$TaskManager$Impl$$tasks;
    }

    public Publish<ZIO, TaskInfo> polynote$kernel$task$TaskManager$Impl$$updates() {
        return this.polynote$kernel$task$TaskManager$Impl$$updates;
    }

    private String lbl(String str, String str2) {
        return str2.isEmpty() ? str : str2;
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public <R extends Has<Ref<ZIO, TaskInfo>>, A, R1 extends Has<?>> ZIO<R1, Throwable, ZIO<Object, Throwable, A>> queue(String str, String str2, String str3, Function1<Cause<Throwable>, Function1<TaskInfo, TaskInfo>> function1, ZIO<R, Throwable, A> zio, Predef$.less.colon.less<R1, R> lessVar) {
        return this.queueing.withPermit(((ZIO) SignallingRef$.MODULE$.apply(new TaskInfo(polynote.messages.package$.MODULE$.truncateTinyString(str), polynote.messages.package$.MODULE$.truncateTinyString(lbl(str, str2)), polynote.messages.package$.MODULE$.truncateShortString(str3), Queued$.MODULE$, TaskInfo$.MODULE$.apply$default$5(), TaskInfo$.MODULE$.apply$default$6()), catz$.MODULE$.taskConcurrentInstance())).map(new package$TaskManager$Impl$$anonfun$queue$1(this, str, function1)).flatMap(new package$TaskManager$Impl$$anonfun$queue$2(this, str, zio, lessVar)));
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public <R extends Has<Ref<ZIO, TaskInfo>>, A, R1 extends Has<?>> String queue$default$2() {
        return "";
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public <R extends Has<Ref<ZIO, TaskInfo>>, A, R1 extends Has<?>> String queue$default$3() {
        return "";
    }

    public <R extends Has<Ref<ZIO, TaskInfo>>, A, R1 extends Has<?>> ZIO<R1, Throwable, A> polynote$kernel$task$TaskManager$Impl$$runImpl(String str, String str2, String str3, Option<String> option, Function1<Cause<Throwable>, Function1<TaskInfo, TaskInfo>> function1, ZIO<R, Throwable, A> zio, Predef$.less.colon.less<R1, R> lessVar) {
        return ((ZIO) SignallingRef$.MODULE$.apply(new TaskInfo(polynote.messages.package$.MODULE$.truncateTinyString(str), polynote.messages.package$.MODULE$.truncateTinyString(lbl(str, str2)), polynote.messages.package$.MODULE$.truncateShortString(str3), Running$.MODULE$, (byte) 0, option), catz$.MODULE$.taskConcurrentInstance())).map(new package$TaskManager$Impl$$anonfun$polynote$kernel$task$TaskManager$Impl$$runImpl$1(this, str)).flatMap(new package$TaskManager$Impl$$anonfun$polynote$kernel$task$TaskManager$Impl$$runImpl$2(this, str, function1, zio, lessVar));
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public <R extends Has<Ref<ZIO, TaskInfo>>, A, R1 extends Has<?>> ZIO<R1, Throwable, A> run(String str, String str2, String str3, Function1<Cause<Throwable>, Function1<TaskInfo, TaskInfo>> function1, ZIO<R, Throwable, A> zio, Predef$.less.colon.less<R1, R> lessVar) {
        return polynote$kernel$task$TaskManager$Impl$$runImpl(str, str2, str3, None$.MODULE$, function1, zio, lessVar);
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public <R extends Has<Ref<ZIO, TaskInfo>>, A, R1 extends Has<?>> String run$default$2() {
        return "";
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public <R extends Has<Ref<ZIO, TaskInfo>>, A, R1 extends Has<?>> String run$default$3() {
        return "";
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public <R extends Has<Ref<ZIO, TaskInfo>>, A> ZIO<R, Throwable, A> runSubtask(String str, String str2, String str3, Function1<Cause<Throwable>, Function1<TaskInfo, TaskInfo>> function1, ZIO<R, Throwable, A> zio) {
        return ZIO$AccessMPartiallyApplied$.MODULE$.apply$extension(ZIO$.MODULE$.accessM(), new package$TaskManager$Impl$$anonfun$runSubtask$1(this)).map(new package$TaskManager$Impl$$anonfun$runSubtask$2(this)).flatMap(new package$TaskManager$Impl$$anonfun$runSubtask$3(this, str, str2, str3, function1, zio));
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public ZIO<Has<package.Blocking.Service>, Throwable, Fiber<Throwable, BoxedUnit>> register(String str, String str2, String str3, Option<String> option, DoneStatus doneStatus, Function1<Function1<Function1<TaskInfo, TaskInfo>, BoxedUnit>, ZIO<Has<package$Logging$Service>, Nothing$, BoxedUnit>> function1) {
        return ((ZIO) SignallingRef$.MODULE$.apply(new TaskInfo(polynote.messages.package$.MODULE$.truncateTinyString(str), polynote.messages.package$.MODULE$.truncateTinyString(lbl(str, str2)), polynote.messages.package$.MODULE$.truncateShortString(str3), Running$.MODULE$, (byte) 0, option.map(new package$TaskManager$Impl$$anonfun$register$1(this))), catz$.MODULE$.taskConcurrentInstance())).map(new package$TaskManager$Impl$$anonfun$register$2(this)).flatMap(new package$TaskManager$Impl$$anonfun$register$3(this, str, doneStatus, function1));
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public String register$default$2() {
        return "";
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public String register$default$3() {
        return "";
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public ZIO<Object, Nothing$, BoxedUnit> cancelAll() {
        return ZIO$.MODULE$.collectAll((Iterable) ((TraversableOnce) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(polynote$kernel$task$TaskManager$Impl$$tasks().values()).asScala()).toList().reverse().map(new package$TaskManager$Impl$$anonfun$cancelAll$1(this), List$.MODULE$.canBuildFrom())).zipPar(this.polynote$kernel$task$TaskManager$Impl$$readyQueue.takeAll().flatMap(new package$TaskManager$Impl$$anonfun$cancelAll$2(this))).unit();
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public ZIO<Object, Nothing$, List<TaskInfo>> list() {
        return ZIO$.MODULE$.collectAll((Iterable) ((List) ((TraversableOnce) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(polynote$kernel$task$TaskManager$Impl$$tasks().values()).asScala()).toList().sortBy(new package$TaskManager$Impl$$anonfun$list$1(this), Ordering$Long$.MODULE$)).map(new package$TaskManager$Impl$$anonfun$list$2(this), List$.MODULE$.canBuildFrom()));
    }

    @Override // polynote.kernel.task.package$TaskManager$Service
    public ZIO<Object, Nothing$, BoxedUnit> shutdown() {
        return cancelAll().$times$greater(new package$TaskManager$Impl$$anonfun$shutdown$1(this));
    }

    public package$TaskManager$Impl(Semaphore semaphore, Publish<ZIO, KernelStatusUpdate> publish, ZQueue<Object, Nothing$, Object, Nothing$, Tuple2<Promise<Throwable, BoxedUnit>, Promise<Throwable, BoxedUnit>>, Tuple2<Promise<Throwable, BoxedUnit>, Promise<Throwable, BoxedUnit>>> zQueue, Fiber<Throwable, Nothing$> fiber) {
        this.queueing = semaphore;
        this.polynote$kernel$task$TaskManager$Impl$$readyQueue = zQueue;
        this.polynote$kernel$task$TaskManager$Impl$$process = fiber;
        package$TaskManager$Service.Cclass.$init$(this);
        this.polynote$kernel$task$TaskManager$Impl$$taskCounter = new AtomicLong(0L);
        this.polynote$kernel$task$TaskManager$Impl$$tasks = new ConcurrentHashMap<>();
        this.polynote$kernel$task$TaskManager$Impl$$updates = publish.contramap(new package$TaskManager$Impl$$anonfun$1(this));
    }
}
