package com.wixpress.dst.greyhound.core.consumer;

import com.wixpress.dst.greyhound.core.consumer.Dispatcher;
import com.wixpress.dst.greyhound.core.consumer.DispatcherMetric;
import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerRecord;
import com.wixpress.dst.greyhound.core.consumer.domain.TopicPartition;
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics$;
import scala.Function1;
import scala.MatchError;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import zio.Chunk;
import zio.Fiber;
import zio.Has;
import zio.Promise;
import zio.Queue$;
import zio.Ref$;
import zio.UIO$;
import zio.ZIO;
import zio.ZQueue;
import zio.ZRef;
import zio.ZRef$;
import zio.ZRef$UnifiedSyntax$;
import zio.clock.package;

/* compiled from: Dispatcher.scala */
/* loaded from: input_file:com/wixpress/dst/greyhound/core/consumer/Dispatcher$Worker$.class */
public class Dispatcher$Worker$ {
    public static Dispatcher$Worker$ MODULE$;

    static {
        new Dispatcher$Worker$();
    }

    public <R> ZIO<Has<package.Clock.Service>, Nothing$, Dispatcher.Worker> make(ZRef<Nothing$, Nothing$, Dispatcher.State, Dispatcher.State> zRef, Function1<ConsumerRecord<Chunk<Object>, Chunk<Object>>, ZIO<R, Nothing$, Object>> function1, int i, String str, String str2, TopicPartition topicPartition) {
        return Queue$.MODULE$.dropping(i).flatMap(zQueue -> {
            return Ref$.MODULE$.make(WorkerInternalState$.MODULE$.empty()).flatMap(zRef2 -> {
                return MODULE$.pollOnce(zRef, zRef2, function1, zQueue, str, str2, topicPartition).interruptible().doWhile(obj -> {
                    return BoxesRunTime.boxToBoolean($anonfun$make$5(BoxesRunTime.unboxToBoolean(obj)));
                }).forkDaemon().map(runtime -> {
                    return new Dispatcher.Worker(zQueue, zRef2, runtime) { // from class: com.wixpress.dst.greyhound.core.consumer.Dispatcher$Worker$$anon$2
                        private final ZQueue queue$1;
                        private final ZRef internalState$1;
                        private final Fiber.Runtime fiber$1;

                        @Override // com.wixpress.dst.greyhound.core.consumer.Dispatcher.Worker
                        public ZIO<Object, Nothing$, Object> submit(ConsumerRecord<Chunk<Object>, Chunk<Object>> consumerRecord) {
                            return this.queue$1.offer(consumerRecord);
                        }

                        @Override // com.wixpress.dst.greyhound.core.consumer.Dispatcher.Worker
                        public ZIO<Object, Nothing$, WorkerExposedState> expose() {
                            return this.queue$1.size().zip(this.internalState$1.get()).map(tuple2 -> {
                                if (tuple2 == null) {
                                    throw new MatchError(tuple2);
                                }
                                return new WorkerExposedState(Math.max(0, tuple2._1$mcI$sp()), ((WorkerInternalState) tuple2._2()).currentExecutionStarted().map(j -> {
                                    return System.currentTimeMillis() - j;
                                }));
                            });
                        }

                        @Override // com.wixpress.dst.greyhound.core.consumer.Dispatcher.Worker
                        public ZIO<Object, Nothing$, BoxedUnit> shutdown() {
                            return this.fiber$1.interrupt().unit();
                        }

                        {
                            this.queue$1 = zQueue;
                            this.internalState$1 = zRef2;
                            this.fiber$1 = runtime;
                        }
                    };
                });
            });
        });
    }

    private <R> ZIO<Has<package.Clock.Service>, Nothing$, Object> pollOnce(ZRef<Nothing$, Nothing$, Dispatcher.State, Dispatcher.State> zRef, ZRef<Nothing$, Nothing$, WorkerInternalState, WorkerInternalState> zRef2, Function1<ConsumerRecord<Chunk<Object>, Chunk<Object>>, ZIO<R, Nothing$, Object>> function1, ZQueue<Object, Object, Nothing$, Nothing$, ConsumerRecord<Chunk<Object>, Chunk<Object>>, ConsumerRecord<Chunk<Object>, Chunk<Object>>> zQueue, String str, String str2, TopicPartition topicPartition) {
        return ZRef$UnifiedSyntax$.MODULE$.update$extension(ZRef$.MODULE$.UnifiedSyntax(zRef2), workerInternalState -> {
            return workerInternalState.cleared();
        }).$times$greater(() -> {
            return zRef.get().flatMap(state -> {
                ZIO apply;
                if (Dispatcher$State$Running$.MODULE$.equals(state)) {
                    apply = zQueue.take().flatMap(consumerRecord -> {
                        return GreyhoundMetrics$.MODULE$.report(new DispatcherMetric.TookRecordFromQueue(consumerRecord, str, str2)).$times$greater(() -> {
                            return ZRef$UnifiedSyntax$.MODULE$.update$extension(ZRef$.MODULE$.UnifiedSyntax(zRef2), workerInternalState2 -> {
                                return workerInternalState2.started();
                            });
                        }).$times$greater(() -> {
                            return ((ZIO) function1.apply(consumerRecord)).uninterruptible().as(() -> {
                                return true;
                            });
                        });
                    });
                } else if (state instanceof Dispatcher.State.Paused) {
                    Promise<Nothing$, BoxedUnit> resume = ((Dispatcher.State.Paused) state).resume();
                    apply = GreyhoundMetrics$.MODULE$.report(new DispatcherMetric.WorkerWaitingForResume(str, str2, topicPartition)).$times$greater(() -> {
                        return resume.await().timeout(zio.duration.package$.MODULE$.durationInt(30).seconds()).as(() -> {
                            return true;
                        });
                    });
                } else {
                    if (!Dispatcher$State$ShuttingDown$.MODULE$.equals(state)) {
                        throw new MatchError(state);
                    }
                    apply = UIO$.MODULE$.apply(() -> {
                        return false;
                    });
                }
                return apply;
            });
        });
    }

    public static final /* synthetic */ boolean $anonfun$make$5(boolean z) {
        return z;
    }

    public Dispatcher$Worker$() {
        MODULE$ = this;
    }
}
