package com.wixpress.dst.greyhound.core.consumer;

import com.wixpress.dst.greyhound.core.consumer.EventLoopMetric;
import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerRecord;
import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerRecord$;
import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerSubscription;
import com.wixpress.dst.greyhound.core.consumer.domain.RecordHandler;
import com.wixpress.dst.greyhound.core.consumer.domain.TopicPartition;
import com.wixpress.dst.greyhound.core.consumer.domain.TopicPartition$;
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics;
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics$;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple4;
import scala.collection.Iterable;
import scala.collection.JavaConverters$;
import scala.collection.immutable.Set;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import zio.CanFail$;
import zio.Chunk;
import zio.Exit;
import zio.Fiber;
import zio.Has;
import zio.Promise;
import zio.Promise$;
import zio.Ref$;
import zio.Runtime;
import zio.UIO$;
import zio.ZIO;
import zio.ZIO$;
import zio.ZManaged;
import zio.ZRef;
import zio.ZRef$;
import zio.ZRef$UnifiedSyntax$;
import zio.blocking.package;
import zio.clock.package;

/* compiled from: EventLoop.scala */
/* loaded from: input_file:com/wixpress/dst/greyhound/core/consumer/EventLoop$.class */
public final class EventLoop$ {
    public static EventLoop$ MODULE$;
    private final ZIO<Object, Nothing$, ConsumerRecords<Nothing$, Nothing$>> emptyRecords;

    static {
        new EventLoop$();
    }

    public <R> ZManaged<Has<package.Clock.Service>, Throwable, EventLoop<Has<GreyhoundMetrics.Service>>> make(String str, ConsumerSubscription consumerSubscription, Consumer consumer, RecordHandler<R, Nothing$, Chunk<Object>, Chunk<Object>> recordHandler, String str2, EventLoopConfig eventLoopConfig) {
        return GreyhoundMetrics$.MODULE$.report(new EventLoopMetric.StartingEventLoop(str2, str)).flatMap(boxedUnit -> {
            return Offsets$.MODULE$.make().map(offsets -> {
                return new Tuple2(offsets, consumerRecord -> {
                    return recordHandler.andThen(consumerRecord -> {
                        return offsets.update(consumerRecord);
                    }).handle(consumerRecord);
                });
            }).flatMap(tuple2 -> {
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                Offsets offsets2 = (Offsets) tuple2._1();
                return Dispatcher$.MODULE$.make(str, str2, (Function1) tuple2._2(), eventLoopConfig.lowWatermark(), eventLoopConfig.highWatermark()).flatMap(dispatcher -> {
                    return Ref$.MODULE$.make(Predef$.MODULE$.Set().empty()).flatMap(zRef -> {
                        return Promise$.MODULE$.make().flatMap(promise -> {
                            return ZIO$.MODULE$.runtime().map(runtime -> {
                                return new Tuple2(runtime, MODULE$.listener(runtime, zRef, eventLoopConfig, dispatcher, promise, str, consumer, str2, offsets2));
                            }).flatMap(tuple2 -> {
                                if (tuple2 == null) {
                                    throw new MatchError(tuple2);
                                }
                                return package$.MODULE$.subscribe(consumerSubscription, (RebalanceListener) tuple2._2(), consumer).flatMap(boxedUnit -> {
                                    return Ref$.MODULE$.make(BoxesRunTime.boxToBoolean(true)).flatMap(zRef -> {
                                        return MODULE$.pollOnce(zRef, consumer, dispatcher, zRef, offsets2, eventLoopConfig, str2, str).doWhile(obj -> {
                                            return BoxesRunTime.boxToBoolean($anonfun$make$13(BoxesRunTime.unboxToBoolean(obj)));
                                        }).forkDaemon().flatMap(runtime2 -> {
                                            return promise.await().map(boxedUnit -> {
                                                return new Tuple4(dispatcher, runtime2, offsets2, zRef);
                                            });
                                        });
                                    });
                                });
                            });
                        });
                    });
                });
            });
        }).toManaged(tuple4 -> {
            if (tuple4 == null) {
                throw new MatchError(tuple4);
            }
            Dispatcher dispatcher = (Dispatcher) tuple4._1();
            Fiber.Runtime runtime = (Fiber.Runtime) tuple4._2();
            Offsets offsets = (Offsets) tuple4._3();
            ZRef zRef = (ZRef) tuple4._4();
            return GreyhoundMetrics$.MODULE$.report(new EventLoopMetric.StoppingEventLoop(str2, str)).flatMap(boxedUnit2 -> {
                return zRef.set(BoxesRunTime.boxToBoolean(false)).flatMap(boxedUnit2 -> {
                    return runtime.join().$times$greater(() -> {
                        return dispatcher.shutdown();
                    }).timeout(eventLoopConfig.drainTimeout()).flatMap(option -> {
                        return ZIO$.MODULE$.when(() -> {
                            return option.isEmpty();
                        }, () -> {
                            return GreyhoundMetrics$.MODULE$.report(new EventLoopMetric.DrainTimeoutExceeded(str2, str));
                        }).flatMap(boxedUnit2 -> {
                            return MODULE$.com$wixpress$dst$greyhound$core$consumer$EventLoop$$commitOffsets(consumer, offsets, MODULE$.commitOffsets$default$3()).map(boxedUnit2 -> {
                                $anonfun$make$24(boxedUnit2);
                                return BoxedUnit.UNIT;
                            });
                        });
                    });
                });
            });
        }).map(tuple42 -> {
            if (tuple42 == null) {
                throw new MatchError(tuple42);
            }
            final Dispatcher dispatcher = (Dispatcher) tuple42._1();
            final Fiber.Runtime runtime = (Fiber.Runtime) tuple42._2();
            return new EventLoop<Has<GreyhoundMetrics.Service>>(str2, str, dispatcher, runtime) { // from class: com.wixpress.dst.greyhound.core.consumer.EventLoop$$anon$1
                private final String clientId$1;
                private final String group$1;
                private final Dispatcher dispatcher$3;
                private final Fiber.Runtime fiber$3;

                @Override // com.wixpress.dst.greyhound.core.consumer.Resource
                public final <R1 extends Has<GreyhoundMetrics.Service>> Resource<R1> combine(Resource<R1> resource) {
                    Resource<R1> combine;
                    combine = combine(resource);
                    return combine;
                }

                @Override // com.wixpress.dst.greyhound.core.consumer.Resource
                public ZIO<Has<GreyhoundMetrics.Service>, Nothing$, BoxedUnit> pause() {
                    return GreyhoundMetrics$.MODULE$.report(new EventLoopMetric.PausingEventLoop(this.clientId$1, this.group$1)).$times$greater(() -> {
                        return this.dispatcher$3.pause();
                    });
                }

                @Override // com.wixpress.dst.greyhound.core.consumer.Resource
                public ZIO<Has<GreyhoundMetrics.Service>, Nothing$, BoxedUnit> resume() {
                    return GreyhoundMetrics$.MODULE$.report(new EventLoopMetric.ResumingEventLoop(this.clientId$1, this.group$1)).$times$greater(() -> {
                        return this.dispatcher$3.resume();
                    });
                }

                @Override // com.wixpress.dst.greyhound.core.consumer.Resource
                public ZIO<Object, Nothing$, Object> isAlive() {
                    return this.fiber$3.poll().map(option -> {
                        return BoxesRunTime.boxToBoolean($anonfun$isAlive$1(option));
                    });
                }

                @Override // com.wixpress.dst.greyhound.core.consumer.EventLoop
                public ZIO<Object, Nothing$, DispatcherExposedState> state() {
                    return this.dispatcher$3.expose();
                }

                public static final /* synthetic */ boolean $anonfun$isAlive$1(Option option) {
                    return ((option instanceof Some) && (((Exit) ((Some) option).value()) instanceof Exit.Failure)) ? false : true;
                }

                {
                    this.clientId$1 = str2;
                    this.group$1 = str;
                    this.dispatcher$3 = dispatcher;
                    this.fiber$3 = runtime;
                    Resource.$init$(this);
                }
            };
        });
    }

    public <R> EventLoopConfig make$default$6() {
        return EventLoopConfig$.MODULE$.Default();
    }

    private <R2> ZIO<Has<package.Clock.Service>, Nothing$, Object> pollOnce(ZRef<Nothing$, Nothing$, Object, Object> zRef, Consumer consumer, Dispatcher<R2> dispatcher, ZRef<Nothing$, Nothing$, Set<TopicPartition>, Set<TopicPartition>> zRef2, Offsets offsets, EventLoopConfig eventLoopConfig, String str, String str2) {
        return zRef.get().flatMap(obj -> {
            return $anonfun$pollOnce$1(consumer, str, str2, dispatcher, zRef2, eventLoopConfig, offsets, BoxesRunTime.unboxToBoolean(obj));
        });
    }

    private <R2, R1> RebalanceListener<Has<package.Blocking.Service>> listener(final Runtime<Has<package.Clock.Service>> runtime, final ZRef<Nothing$, Nothing$, Set<TopicPartition>, Set<TopicPartition>> zRef, final EventLoopConfig eventLoopConfig, final Dispatcher<R2> dispatcher, final Promise<Nothing$, BoxedUnit> promise, final String str, final Consumer consumer, final String str2, final Offsets offsets) {
        return new RebalanceListener<Has<package.Blocking.Service>>(runtime, zRef, eventLoopConfig, dispatcher, str2, str, consumer, offsets, promise) { // from class: com.wixpress.dst.greyhound.core.consumer.EventLoop$$anon$2
            private final Runtime runtime$1;
            private final ZRef pausedPartitionsRef$2;
            private final EventLoopConfig config$3;
            private final Dispatcher dispatcher$5;
            private final String clientId$3;
            private final String group$3;
            private final Consumer consumer$3;
            private final Offsets offsets$5;
            private final Promise partitionsAssigned$2;

            @Override // com.wixpress.dst.greyhound.core.consumer.RebalanceListener
            public <R1> RebalanceListener<Has<package.Blocking.Service>> $times$greater(RebalanceListener<R1> rebalanceListener) {
                RebalanceListener<Has<package.Blocking.Service>> $times$greater;
                $times$greater = $times$greater(rebalanceListener);
                return $times$greater;
            }

            @Override // com.wixpress.dst.greyhound.core.consumer.RebalanceListener
            public ZIO<Has<package.Blocking.Service>, Nothing$, Object> onPartitionsRevoked(Set<TopicPartition> set) {
                return ZIO$.MODULE$.effectTotal(() -> {
                    this.runtime$1.unsafeRun(() -> {
                        return this.pausedPartitionsRef$2.set(Predef$.MODULE$.Set().empty()).$times$greater(() -> {
                            return this.config$3.rebalanceListener().onPartitionsRevoked(set);
                        }).$times$greater(() -> {
                            return this.dispatcher$5.revoke(set).timeout(this.config$3.drainTimeout()).flatMap(option -> {
                                return ZIO$.MODULE$.when(() -> {
                                    return option.isEmpty();
                                }, () -> {
                                    return GreyhoundMetrics$.MODULE$.report(new EventLoopMetric.DrainTimeoutExceeded(this.clientId$3, this.group$3));
                                });
                            });
                        });
                    });
                }).$times$greater(() -> {
                    return EventLoop$.MODULE$.com$wixpress$dst$greyhound$core$consumer$EventLoop$$commitOffsets(this.consumer$3, this.offsets$5, true);
                });
            }

            @Override // com.wixpress.dst.greyhound.core.consumer.RebalanceListener
            public ZIO<Has<package.Blocking.Service>, Nothing$, Object> onPartitionsAssigned(Set<TopicPartition> set) {
                return this.config$3.rebalanceListener().onPartitionsAssigned(set).$times$greater(() -> {
                    return this.partitionsAssigned$2.succeed(BoxedUnit.UNIT);
                });
            }

            {
                this.runtime$1 = runtime;
                this.pausedPartitionsRef$2 = zRef;
                this.config$3 = eventLoopConfig;
                this.dispatcher$5 = dispatcher;
                this.clientId$3 = str2;
                this.group$3 = str;
                this.consumer$3 = consumer;
                this.offsets$5 = offsets;
                this.partitionsAssigned$2 = promise;
                RebalanceListener.$init$(this);
            }
        };
    }

    private <R1, R2> ZIO<Has<GreyhoundMetrics.Service>, Nothing$, BoxedUnit> resumePartitions(Consumer consumer, String str, String str2, Dispatcher<R2> dispatcher, ZRef<Nothing$, Nothing$, Set<TopicPartition>, Set<TopicPartition>> zRef) {
        return zRef.get().flatMap(set -> {
            return dispatcher.resumeablePartitions(set).flatMap(set -> {
                return ZIO$.MODULE$.when(() -> {
                    return set.nonEmpty();
                }, () -> {
                    return GreyhoundMetrics$.MODULE$.report(new EventLoopMetric.LowWatermarkReached(str, str2, set));
                }).flatMap(boxedUnit -> {
                    return consumer.resume(set).tapError(illegalStateException -> {
                        return UIO$.MODULE$.apply(() -> {
                            illegalStateException.printStackTrace();
                        });
                    }, CanFail$.MODULE$.canFail()).ignore().flatMap(boxedUnit -> {
                        return ZRef$UnifiedSyntax$.MODULE$.update$extension(ZRef$.MODULE$.UnifiedSyntax(zRef), set -> {
                            return set.$minus$minus(set);
                        }).map(boxedUnit -> {
                            $anonfun$resumePartitions$10(boxedUnit);
                            return BoxedUnit.UNIT;
                        });
                    });
                });
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ZIO<Object, Nothing$, ConsumerRecords<Nothing$, Nothing$>> emptyRecords() {
        return this.emptyRecords;
    }

    private <R1, R2> ZIO<Has<package.Clock.Service>, Nothing$, BoxedUnit> pollAndHandle(Consumer consumer, Dispatcher<R2> dispatcher, ZRef<Nothing$, Nothing$, Set<TopicPartition>, Set<TopicPartition>> zRef, EventLoopConfig eventLoopConfig, String str) {
        return consumer.poll(eventLoopConfig.pollTimeout()).catchAll(th -> {
            return MODULE$.emptyRecords();
        }, CanFail$.MODULE$.canFail()).flatMap(consumerRecords -> {
            return zRef.get().flatMap(set -> {
                return ZIO$.MODULE$.foldLeft((Iterable) JavaConverters$.MODULE$.iterableAsScalaIterableConverter(consumerRecords).asScala(), set, (set, consumerRecord) -> {
                    ConsumerRecord<?, ?> apply = ConsumerRecord$.MODULE$.apply(consumerRecord);
                    TopicPartition apply2 = TopicPartition$.MODULE$.apply(apply);
                    return set.contains(apply2) ? GreyhoundMetrics$.MODULE$.report(new EventLoopMetric.PartitionThrottled(apply2, apply.offset())).as(() -> {
                        return set;
                    }) : dispatcher.submit(apply).flatMap(submitResult -> {
                        ZIO $times$greater;
                        if (SubmitResult$Submitted$.MODULE$.equals(submitResult)) {
                            $times$greater = ZIO$.MODULE$.succeed(() -> {
                                return set;
                            });
                        } else {
                            if (!SubmitResult$Rejected$.MODULE$.equals(submitResult)) {
                                throw new MatchError(submitResult);
                            }
                            $times$greater = GreyhoundMetrics$.MODULE$.report(new EventLoopMetric.HighWatermarkReached(apply2, apply.offset())).$times$greater(() -> {
                                return consumer.pause((ConsumerRecord<?, ?>) apply).fold(illegalStateException -> {
                                    return set;
                                }, boxedUnit -> {
                                    return set.$plus(apply2);
                                }, CanFail$.MODULE$.canFail());
                            });
                        }
                        return $times$greater;
                    });
                }).flatMap(set2 -> {
                    return ZRef$UnifiedSyntax$.MODULE$.update$extension(ZRef$.MODULE$.UnifiedSyntax(zRef), set2 -> {
                        return set2;
                    });
                });
            });
        });
    }

    public <R> ZIO<Has<package.Blocking.Service>, Nothing$, BoxedUnit> com$wixpress$dst$greyhound$core$consumer$EventLoop$$commitOffsets(Consumer consumer, Offsets offsets, boolean z) {
        return offsets.committable().flatMap(map -> {
            return consumer.commit(map, z).catchAll(th -> {
                return ZIO$.MODULE$.foreach_(map, tuple2 -> {
                    if (tuple2 != null) {
                        return offsets.update((TopicPartition) tuple2._1(), tuple2._2$mcJ$sp());
                    }
                    throw new MatchError(tuple2);
                });
            }, CanFail$.MODULE$.canFail());
        });
    }

    private <R> boolean commitOffsets$default$3() {
        return false;
    }

    public static final /* synthetic */ boolean $anonfun$make$13(boolean z) {
        return z;
    }

    public static final /* synthetic */ void $anonfun$make$24(BoxedUnit boxedUnit) {
    }

    public static final /* synthetic */ boolean $anonfun$pollOnce$4(BoxedUnit boxedUnit) {
        return true;
    }

    public static final /* synthetic */ ZIO $anonfun$pollOnce$1(Consumer consumer, String str, String str2, Dispatcher dispatcher, ZRef zRef, EventLoopConfig eventLoopConfig, Offsets offsets, boolean z) {
        ZIO apply;
        if (true == z) {
            apply = MODULE$.resumePartitions(consumer, str, str2, dispatcher, zRef).flatMap(boxedUnit -> {
                return MODULE$.pollAndHandle(consumer, dispatcher, zRef, eventLoopConfig, str).flatMap(boxedUnit -> {
                    return MODULE$.com$wixpress$dst$greyhound$core$consumer$EventLoop$$commitOffsets(consumer, offsets, MODULE$.commitOffsets$default$3()).map(boxedUnit -> {
                        return BoxesRunTime.boxToBoolean($anonfun$pollOnce$4(boxedUnit));
                    });
                });
            });
        } else {
            if (false != z) {
                throw new MatchError(BoxesRunTime.boxToBoolean(z));
            }
            apply = UIO$.MODULE$.apply(() -> {
                return false;
            });
        }
        return apply;
    }

    public static final /* synthetic */ void $anonfun$resumePartitions$10(BoxedUnit boxedUnit) {
    }

    private EventLoop$() {
        MODULE$ = this;
        this.emptyRecords = ZIO$.MODULE$.succeed(() -> {
            return ConsumerRecords.empty();
        });
    }
}
