package com.wixpress.dst.greyhound.core.consumer;

import com.wixpress.dst.greyhound.core.consumer.EventLoopMetric;
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetric;
import com.wixpress.dst.greyhound.core.metrics.Metrics;
import com.wixpress.dst.greyhound.core.metrics.Metrics$;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple4;
import scala.collection.Iterable;
import scala.collection.JavaConverters$;
import scala.collection.immutable.Set;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import zio.CanFail$;
import zio.Chunk;
import zio.Exit;
import zio.Fiber;
import zio.Promise;
import zio.Promise$;
import zio.Ref;
import zio.Ref$;
import zio.Runtime;
import zio.ZIO;
import zio.ZIO$;
import zio.ZManaged;
import zio.blocking.Blocking;
import zio.clock.Clock;

/* compiled from: EventLoop.scala */
/* loaded from: input_file:com/wixpress/dst/greyhound/core/consumer/EventLoop$.class */
public final class EventLoop$ {
    public static EventLoop$ MODULE$;
    private final ZIO<Object, Nothing$, ConsumerRecords<Nothing$, Nothing$>> emptyRecords;

    static {
        new EventLoop$();
    }

    public <R1, R2> ZManaged<R1, Throwable, EventLoop<R2>> make(String str, Set<String> set, Consumer<R1> consumer, RecordHandler<R2, Nothing$, Chunk<Object>, Chunk<Object>> recordHandler, EventLoopConfig eventLoopConfig) {
        return Metrics$.MODULE$.report(EventLoopMetric$StartingEventLoop$.MODULE$).flatMap(boxedUnit -> {
            return Offsets$.MODULE$.make().map(offsets -> {
                return new Tuple2(offsets, consumerRecord -> {
                    return recordHandler.andThen(consumerRecord -> {
                        return offsets.update(consumerRecord);
                    }).handle(consumerRecord);
                });
            }).flatMap(tuple2 -> {
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                Offsets offsets2 = (Offsets) tuple2._1();
                return Dispatcher$.MODULE$.make(str, (Function1) tuple2._2(), eventLoopConfig.lowWatermark(), eventLoopConfig.highWatermark()).flatMap(dispatcher -> {
                    return Promise$.MODULE$.make().flatMap(promise -> {
                        return ZIO$.MODULE$.runtime().flatMap(runtime -> {
                            return consumer.subscribe(set, new RebalanceListener<Blocking>(consumer, eventLoopConfig, offsets2, dispatcher, promise, runtime) { // from class: com.wixpress.dst.greyhound.core.consumer.EventLoop$$anon$2
                                private final Consumer consumer$1;
                                private final EventLoopConfig config$1;
                                private final Offsets offsets$1;
                                private final Dispatcher dispatcher$5;
                                private final Promise partitionsAssigned$1;
                                private final Runtime runtime$1;

                                @Override // com.wixpress.dst.greyhound.core.consumer.RebalanceListener
                                public <R1> RebalanceListener<Blocking> $times$greater(RebalanceListener<R1> rebalanceListener) {
                                    RebalanceListener<Blocking> $times$greater;
                                    $times$greater = $times$greater(rebalanceListener);
                                    return $times$greater;
                                }

                                @Override // com.wixpress.dst.greyhound.core.consumer.RebalanceListener
                                public ZIO<Blocking, Nothing$, Object> onPartitionsRevoked(Set<TopicPartition> set2) {
                                    return ZIO$.MODULE$.effectTotal(() -> {
                                        this.runtime$1.unsafeRun(() -> {
                                            return this.config$1.rebalanceListener().onPartitionsRevoked(set2).$times$greater(() -> {
                                                return this.dispatcher$5.revoke(set2).timeout(this.config$1.drainTimeout()).flatMap(option -> {
                                                    return ZIO$.MODULE$.when(option.isEmpty(), Metrics$.MODULE$.report(EventLoopMetric$DrainTimeoutExceeded$.MODULE$));
                                                });
                                            });
                                        });
                                    }).$times$greater(() -> {
                                        return EventLoop$.MODULE$.com$wixpress$dst$greyhound$core$consumer$EventLoop$$commitOffsets(this.consumer$1, this.offsets$1, true);
                                    });
                                }

                                @Override // com.wixpress.dst.greyhound.core.consumer.RebalanceListener
                                public ZIO<Blocking, Nothing$, Object> onPartitionsAssigned(Set<TopicPartition> set2) {
                                    return this.config$1.rebalanceListener().onPartitionsAssigned(set2).$times$greater(() -> {
                                        return this.partitionsAssigned$1.succeed(BoxedUnit.UNIT);
                                    });
                                }

                                {
                                    this.consumer$1 = consumer;
                                    this.config$1 = eventLoopConfig;
                                    this.offsets$1 = offsets2;
                                    this.dispatcher$5 = dispatcher;
                                    this.partitionsAssigned$1 = promise;
                                    this.runtime$1 = runtime;
                                    RebalanceListener.$init$(this);
                                }
                            }).flatMap(boxedUnit -> {
                                return Ref$.MODULE$.make(BoxesRunTime.boxToBoolean(true)).flatMap(obj -> {
                                    return $anonfun$make$10(consumer, eventLoopConfig, offsets2, dispatcher, promise, ((Ref) obj).zio$Ref$$value());
                                });
                            });
                        });
                    });
                });
            });
        }).toManaged(tuple4 -> {
            if (tuple4 == null) {
                throw new MatchError(tuple4);
            }
            Dispatcher dispatcher = (Dispatcher) tuple4._1();
            Fiber fiber = (Fiber) tuple4._2();
            Offsets offsets = (Offsets) tuple4._3();
            AtomicReference zio$Ref$$value = ((Ref) tuple4._4()).zio$Ref$$value();
            return Metrics$.MODULE$.report(EventLoopMetric$StoppingEventLoop$.MODULE$).flatMap(boxedUnit2 -> {
                return Ref$.MODULE$.set$extension(zio$Ref$$value, BoxesRunTime.boxToBoolean(false)).flatMap(boxedUnit2 -> {
                    return fiber.join().$times$greater(() -> {
                        return dispatcher.shutdown();
                    }).timeout(eventLoopConfig.drainTimeout()).flatMap(option -> {
                        return ZIO$.MODULE$.when(option.isEmpty(), Metrics$.MODULE$.report(EventLoopMetric$DrainTimeoutExceeded$.MODULE$)).flatMap(boxedUnit2 -> {
                            return MODULE$.com$wixpress$dst$greyhound$core$consumer$EventLoop$$commitOffsets(consumer, offsets, MODULE$.commitOffsets$default$3()).map(boxedUnit2 -> {
                                $anonfun$make$19(boxedUnit2);
                                return BoxedUnit.UNIT;
                            });
                        });
                    });
                });
            });
        }).map(tuple42 -> {
            if (tuple42 == null) {
                throw new MatchError(tuple42);
            }
            final Dispatcher dispatcher = (Dispatcher) tuple42._1();
            final Fiber fiber = (Fiber) tuple42._2();
            return new EventLoop<R2>(dispatcher, fiber) { // from class: com.wixpress.dst.greyhound.core.consumer.EventLoop$$anon$1
                private final Dispatcher dispatcher$3;
                private final Fiber fiber$2;

                @Override // com.wixpress.dst.greyhound.core.consumer.Resource
                public final <R1 extends R2 & Metrics<GreyhoundMetric> & Clock> Resource<R1> combine(Resource<R1> resource) {
                    Resource<R1> combine;
                    combine = combine(resource);
                    return combine;
                }

                @Override // com.wixpress.dst.greyhound.core.consumer.Resource
                public ZIO<R2, Nothing$, BoxedUnit> pause() {
                    return Metrics$.MODULE$.report(EventLoopMetric$PausingEventLoop$.MODULE$).$times$greater(() -> {
                        return this.dispatcher$3.pause();
                    });
                }

                @Override // com.wixpress.dst.greyhound.core.consumer.Resource
                public ZIO<R2, Nothing$, BoxedUnit> resume() {
                    return Metrics$.MODULE$.report(EventLoopMetric$ResumingEventLoop$.MODULE$).$times$greater(() -> {
                        return this.dispatcher$3.resume();
                    });
                }

                @Override // com.wixpress.dst.greyhound.core.consumer.Resource
                public ZIO<Object, Nothing$, Object> isAlive() {
                    return this.fiber$2.poll().map(option -> {
                        return BoxesRunTime.boxToBoolean($anonfun$isAlive$1(option));
                    });
                }

                @Override // com.wixpress.dst.greyhound.core.consumer.EventLoop
                public ZIO<R2, Nothing$, DispatcherExposedState> state() {
                    return this.dispatcher$3.expose();
                }

                public static final /* synthetic */ boolean $anonfun$isAlive$1(Option option) {
                    return ((option instanceof Some) && (((Exit) ((Some) option).value()) instanceof Exit.Failure)) ? false : true;
                }

                {
                    this.dispatcher$3 = dispatcher;
                    this.fiber$2 = fiber;
                    Resource.$init$(this);
                }
            };
        });
    }

    public <R1, R2> EventLoopConfig make$default$5() {
        return EventLoopConfig$.MODULE$.Default();
    }

    private <R1, R2> ZIO<R1, Nothing$, BoxedUnit> loop(AtomicReference<Object> atomicReference, Consumer<R1> consumer, Dispatcher<R2> dispatcher, Set<TopicPartition> set, Offsets offsets, EventLoopConfig eventLoopConfig) {
        return Ref$.MODULE$.get$extension(atomicReference).flatMap(obj -> {
            return $anonfun$loop$1(atomicReference, consumer, dispatcher, set, offsets, eventLoopConfig, BoxesRunTime.unboxToBoolean(obj));
        });
    }

    private <R1, R2> ZIO<R2, Nothing$, Set<TopicPartition>> resumePartitions(Consumer<R1> consumer, Dispatcher<R2> dispatcher, Set<TopicPartition> set) {
        return dispatcher.resumeablePartitions(set).flatMap(set2 -> {
            return consumer.resume(set2).ignore().map(boxedUnit -> {
                return set.diff(set2);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ZIO<Object, Nothing$, ConsumerRecords<Nothing$, Nothing$>> emptyRecords() {
        return this.emptyRecords;
    }

    private <R1, R2> ZIO<Metrics<EventLoopMetric.PartitionThrottled>, Nothing$, Set<TopicPartition>> pollAndHandle(Consumer<R1> consumer, Dispatcher<R2> dispatcher, Set<TopicPartition> set, EventLoopConfig eventLoopConfig) {
        return consumer.poll(eventLoopConfig.pollTimeout()).catchAll(th -> {
            return MODULE$.emptyRecords();
        }, CanFail$.MODULE$.canFail()).flatMap(consumerRecords -> {
            return ZIO$.MODULE$.foldLeft((Iterable) JavaConverters$.MODULE$.iterableAsScalaIterableConverter(consumerRecords).asScala(), set, (set2, consumerRecord) -> {
                ConsumerRecord<?, ?> apply = ConsumerRecord$.MODULE$.apply(consumerRecord);
                TopicPartition apply2 = TopicPartition$.MODULE$.apply(apply);
                return set2.contains(apply2) ? Metrics$.MODULE$.report(new EventLoopMetric.PartitionThrottled(apply2)).as(() -> {
                    return set2;
                }) : dispatcher.submit(apply).flatMap(submitResult -> {
                    ZIO $times$greater;
                    if (SubmitResult$Submitted$.MODULE$.equals(submitResult)) {
                        $times$greater = ZIO$.MODULE$.succeed(set2);
                    } else {
                        if (!SubmitResult$Rejected$.MODULE$.equals(submitResult)) {
                            throw new MatchError(submitResult);
                        }
                        $times$greater = Metrics$.MODULE$.report(new EventLoopMetric.HighWatermarkReached(apply2)).$times$greater(() -> {
                            return consumer.pause((ConsumerRecord<?, ?>) apply).fold(illegalStateException -> {
                                return set2;
                            }, boxedUnit -> {
                                return set2.$plus(apply2);
                            }, CanFail$.MODULE$.canFail());
                        });
                    }
                    return $times$greater;
                });
            });
        });
    }

    public <R> ZIO<R, Nothing$, BoxedUnit> com$wixpress$dst$greyhound$core$consumer$EventLoop$$commitOffsets(Consumer<R> consumer, Offsets offsets, boolean z) {
        return offsets.committable().flatMap(map -> {
            return consumer.commit(map, z).catchAll(th -> {
                return ZIO$.MODULE$.foreach_(map, tuple2 -> {
                    if (tuple2 != null) {
                        return offsets.update((TopicPartition) tuple2._1(), tuple2._2$mcJ$sp());
                    }
                    throw new MatchError(tuple2);
                });
            }, CanFail$.MODULE$.canFail());
        });
    }

    private <R> boolean commitOffsets$default$3() {
        return false;
    }

    public static final /* synthetic */ ZIO $anonfun$make$10(Consumer consumer, EventLoopConfig eventLoopConfig, Offsets offsets, Dispatcher dispatcher, Promise promise, AtomicReference atomicReference) {
        return MODULE$.loop(atomicReference, consumer, dispatcher, Predef$.MODULE$.Set().empty(), offsets, eventLoopConfig).fork().flatMap(fiber -> {
            return promise.await().map(boxedUnit -> {
                return new Tuple4(dispatcher, fiber, offsets, new Ref(atomicReference));
            });
        });
    }

    public static final /* synthetic */ void $anonfun$make$19(BoxedUnit boxedUnit) {
    }

    public static final /* synthetic */ void $anonfun$loop$5(BoxedUnit boxedUnit) {
    }

    public static final /* synthetic */ ZIO $anonfun$loop$1(AtomicReference atomicReference, Consumer consumer, Dispatcher dispatcher, Set set, Offsets offsets, EventLoopConfig eventLoopConfig, boolean z) {
        ZIO unit;
        if (true == z) {
            unit = MODULE$.resumePartitions(consumer, dispatcher, set).flatMap(set2 -> {
                return MODULE$.pollAndHandle(consumer, dispatcher, set2, eventLoopConfig).flatMap(set2 -> {
                    return MODULE$.com$wixpress$dst$greyhound$core$consumer$EventLoop$$commitOffsets(consumer, offsets, MODULE$.commitOffsets$default$3()).flatMap(boxedUnit -> {
                        return MODULE$.loop(atomicReference, consumer, dispatcher, set2, offsets, eventLoopConfig).map(boxedUnit -> {
                            $anonfun$loop$5(boxedUnit);
                            return BoxedUnit.UNIT;
                        });
                    });
                });
            });
        } else {
            if (false != z) {
                throw new MatchError(BoxesRunTime.boxToBoolean(z));
            }
            unit = ZIO$.MODULE$.unit();
        }
        return unit;
    }

    private EventLoop$() {
        MODULE$ = this;
        this.emptyRecords = ZIO$.MODULE$.succeed(ConsumerRecords.empty());
    }
}
