package com.wixpress.dst.greyhound.core.consumer;

import com.wixpress.dst.greyhound.core.TopicPartition;
import com.wixpress.dst.greyhound.core.consumer.Dispatcher;
import com.wixpress.dst.greyhound.core.consumer.DispatcherMetric;
import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerRecord;
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics$;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.collection.immutable.Map;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import zio.Chunk;
import zio.Clock$;
import zio.DurationSyntax$;
import zio.Fiber;
import zio.Promise;
import zio.Queue;
import zio.Queue$;
import zio.Ref;
import zio.ZIO;
import zio.ZIO$;
import zio.Zippable$;
import zio.stm.STM$;
import zio.stm.TRef;
import zio.stm.TRef$;

/* compiled from: Dispatcher.scala */
/* loaded from: input_file:com/wixpress/dst/greyhound/core/consumer/Dispatcher$Worker$.class */
public class Dispatcher$Worker$ {
    public static Dispatcher$Worker$ MODULE$;

    static {
        new Dispatcher$Worker$();
    }

    public <R> ZIO<R, Nothing$, Dispatcher.Worker> make(Ref<Dispatcher.DispatcherState> ref, Function1<ConsumerRecord<Chunk<Object>, Chunk<Object>>, ZIO<R, Nothing$, Object>> function1, int i, String str, String str2, TopicPartition topicPartition, Duration duration, Map<String, String> map, Object obj) {
        return Queue$.MODULE$.dropping(() -> {
            return i;
        }, obj).flatMap(queue -> {
            return TRef$.MODULE$.make(() -> {
                return WorkerInternalState$.MODULE$.empty();
            }).commit(obj).flatMap(tRef -> {
                return MODULE$.pollOnce(ref, tRef, function1, queue, str, str2, topicPartition, map, obj).repeatWhile(obj2 -> {
                    return BoxesRunTime.boxToBoolean($anonfun$make$10(BoxesRunTime.unboxToBoolean(obj2)));
                }, obj).forkDaemon(obj).map(runtime -> {
                    return new Dispatcher.Worker(queue, obj, tRef, runtime, duration) { // from class: com.wixpress.dst.greyhound.core.consumer.Dispatcher$Worker$$anon$2
                        private final Queue queue$1;
                        private final Object trace$2;
                        private final TRef internalState$1;
                        private final Fiber.Runtime fiber$1;
                        private final Duration drainTimeout$2;

                        @Override // com.wixpress.dst.greyhound.core.consumer.Dispatcher.Worker
                        public ZIO<Object, Nothing$, Object> submit(ConsumerRecord<Chunk<Object>, Chunk<Object>> consumerRecord) {
                            return this.queue$1.offer(consumerRecord, this.trace$2).tap(obj3 -> {
                                return $anonfun$submit$5(this, BoxesRunTime.unboxToBoolean(obj3));
                            }, this.trace$2);
                        }

                        @Override // com.wixpress.dst.greyhound.core.consumer.Dispatcher.Worker
                        public ZIO<Object, Nothing$, WorkerExposedState> expose() {
                            return this.queue$1.size(this.trace$2).zip(() -> {
                                return this.internalState$1.get().commit(this.trace$2);
                            }, Zippable$.MODULE$.Zippable2(), this.trace$2).flatMap(tuple2 -> {
                                if (tuple2 == null) {
                                    throw new MatchError(tuple2);
                                }
                                int _1$mcI$sp = tuple2._1$mcI$sp();
                                WorkerInternalState workerInternalState = (WorkerInternalState) tuple2._2();
                                return Clock$.MODULE$.currentTime(() -> {
                                    return TimeUnit.MILLISECONDS;
                                }, this.trace$2).map(obj3 -> {
                                    return $anonfun$expose$10(_1$mcI$sp, workerInternalState, BoxesRunTime.unboxToLong(obj3));
                                }, this.trace$2);
                            }, this.trace$2);
                        }

                        @Override // com.wixpress.dst.greyhound.core.consumer.Dispatcher.Worker
                        public ZIO<Object, Nothing$, BoxedUnit> shutdown() {
                            return this.internalState$1.update(workerInternalState -> {
                                return workerInternalState.shutdown();
                            }).commit(this.trace$2).flatMap(boxedUnit -> {
                                return this.fiber$1.join(this.trace$2).ignore(this.trace$2).disconnect(this.trace$2).timeout(() -> {
                                    return this.drainTimeout$2;
                                }, this.trace$2).flatMap(option -> {
                                    return ZIO$.MODULE$.when(() -> {
                                        return option.isEmpty();
                                    }, () -> {
                                        return this.fiber$1.interruptFork(this.trace$2);
                                    }, this.trace$2).map(option -> {
                                        $anonfun$shutdown$11(option);
                                        return BoxedUnit.UNIT;
                                    }, this.trace$2);
                                }, this.trace$2);
                            }, this.trace$2);
                        }

                        @Override // com.wixpress.dst.greyhound.core.consumer.Dispatcher.Worker
                        public ZIO<Object, Nothing$, BoxedUnit> clearPausedPartitionDuration() {
                            return this.internalState$1.update(workerInternalState -> {
                                return workerInternalState.clearReachedHighWatermark();
                            }).commit(this.trace$2);
                        }

                        @Override // com.wixpress.dst.greyhound.core.consumer.Dispatcher.Worker
                        public ZIO<Object, Nothing$, BoxedUnit> waitForCurrentExecutionCompletion() {
                            return this.internalState$1.get().flatMap(workerInternalState -> {
                                return STM$.MODULE$.check(() -> {
                                    return workerInternalState.currentExecutionStarted().isEmpty();
                                });
                            }).commit(this.trace$2);
                        }

                        public static final /* synthetic */ ZIO $anonfun$submit$9(Dispatcher$Worker$$anon$2 dispatcher$Worker$$anon$2, long j) {
                            return dispatcher$Worker$$anon$2.internalState$1.update(workerInternalState -> {
                                return workerInternalState.reachedHighWatermarkSince().nonEmpty() ? workerInternalState : workerInternalState.reachedHighWatermark(j);
                            }).commit(dispatcher$Worker$$anon$2.trace$2);
                        }

                        public static final /* synthetic */ ZIO $anonfun$submit$5(Dispatcher$Worker$$anon$2 dispatcher$Worker$$anon$2, boolean z) {
                            return ZIO$.MODULE$.when(() -> {
                                return !z;
                            }, () -> {
                                return Clock$.MODULE$.currentTime(() -> {
                                    return TimeUnit.MILLISECONDS;
                                }, dispatcher$Worker$$anon$2.trace$2).flatMap(obj3 -> {
                                    return $anonfun$submit$9(dispatcher$Worker$$anon$2, BoxesRunTime.unboxToLong(obj3));
                                }, dispatcher$Worker$$anon$2.trace$2);
                            }, dispatcher$Worker$$anon$2.trace$2);
                        }

                        public static final /* synthetic */ WorkerExposedState $anonfun$expose$10(int i2, WorkerInternalState workerInternalState, long j) {
                            return new WorkerExposedState(Math.max(0, i2), workerInternalState.currentExecutionStarted().map(j2 -> {
                                return j - j2;
                            }), workerInternalState.reachedHighWatermarkSince().map(j3 -> {
                                return j - j3;
                            }));
                        }

                        public static final /* synthetic */ void $anonfun$shutdown$11(Option option) {
                        }

                        {
                            this.queue$1 = queue;
                            this.trace$2 = obj;
                            this.internalState$1 = tRef;
                            this.fiber$1 = runtime;
                            this.drainTimeout$2 = duration;
                        }
                    };
                }, obj);
            }, obj);
        }, obj);
    }

    private <R> ZIO<R, Nothing$, Object> pollOnce(Ref<Dispatcher.DispatcherState> ref, TRef<WorkerInternalState> tRef, Function1<ConsumerRecord<Chunk<Object>, Chunk<Object>>, ZIO<R, Nothing$, Object>> function1, Queue<ConsumerRecord<Chunk<Object>, Chunk<Object>>> queue, String str, String str2, TopicPartition topicPartition, Map<String, String> map, Object obj) {
        return tRef.update(workerInternalState -> {
            return workerInternalState.cleared();
        }).commit(obj).$times$greater(() -> {
            return ref.get(obj).flatMap(dispatcherState -> {
                ZIO succeed;
                if (Dispatcher$DispatcherState$Running$.MODULE$.equals(dispatcherState)) {
                    succeed = queue.poll(obj).flatMap(option -> {
                        ZIO delay;
                        if (option instanceof Some) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) ((Some) option).value();
                            delay = GreyhoundMetrics$.MODULE$.report(new DispatcherMetric.TookRecordFromQueue(consumerRecord, str, str2, map)).$times$greater(() -> {
                                return tRef.update(workerInternalState2 -> {
                                    return workerInternalState2.started();
                                }).commit(obj);
                            }, obj).$times$greater(() -> {
                                return ((ZIO) function1.apply(consumerRecord)).interruptible(obj).ignore(obj);
                            }, obj).$times$greater(() -> {
                                return Dispatcher$.MODULE$.com$wixpress$dst$greyhound$core$consumer$Dispatcher$$isActive(tRef, obj);
                            }, obj);
                        } else {
                            if (!None$.MODULE$.equals(option)) {
                                throw new MatchError(option);
                            }
                            delay = Dispatcher$.MODULE$.com$wixpress$dst$greyhound$core$consumer$Dispatcher$$isActive(tRef, obj).delay(() -> {
                                return DurationSyntax$.MODULE$.millis$extension(zio.package$.MODULE$.durationInt(5));
                            }, obj);
                        }
                        return delay;
                    }, obj);
                } else if (dispatcherState instanceof Dispatcher.DispatcherState.Paused) {
                    Promise<Nothing$, BoxedUnit> resume = ((Dispatcher.DispatcherState.Paused) dispatcherState).resume();
                    succeed = GreyhoundMetrics$.MODULE$.report(new DispatcherMetric.WorkerWaitingForResume(str, str2, topicPartition, map)).$times$greater(() -> {
                        return resume.await(obj).timeout(() -> {
                            return DurationSyntax$.MODULE$.seconds$extension(zio.package$.MODULE$.durationInt(30));
                        }, obj);
                    }, obj).$times$greater(() -> {
                        return Dispatcher$.MODULE$.com$wixpress$dst$greyhound$core$consumer$Dispatcher$$isActive(tRef, obj);
                    }, obj);
                } else {
                    if (!Dispatcher$DispatcherState$ShuttingDown$.MODULE$.equals(dispatcherState)) {
                        throw new MatchError(dispatcherState);
                    }
                    succeed = ZIO$.MODULE$.succeed(() -> {
                        return false;
                    }, obj);
                }
                return succeed;
            }, obj);
        }, obj);
    }

    public static final /* synthetic */ boolean $anonfun$make$10(boolean z) {
        return z;
    }

    public Dispatcher$Worker$() {
        MODULE$ = this;
    }
}
