package com.wixpress.dst.greyhound.core.consumer.batched;

import com.wixpress.dst.greyhound.core.TopicPartition;
import com.wixpress.dst.greyhound.core.consumer.Consumer;
import com.wixpress.dst.greyhound.core.consumer.Consumer$;
import com.wixpress.dst.greyhound.core.consumer.ConsumerConfig;
import com.wixpress.dst.greyhound.core.consumer.DelayedRebalanceEffect;
import com.wixpress.dst.greyhound.core.consumer.DelayedRebalanceEffect$;
import com.wixpress.dst.greyhound.core.consumer.RebalanceListener;
import com.wixpress.dst.greyhound.core.consumer.ReportingConsumer;
import com.wixpress.dst.greyhound.core.consumer.domain.BatchRecordHandler;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.immutable.Set;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.runtime.Nothing$;
import zio.Chunk;
import zio.Ref;
import zio.Ref$;
import zio.ZEnvironment;
import zio.ZIO;
import zio.package;

/* compiled from: BatchConsumer.scala */
/* loaded from: input_file:com/wixpress/dst/greyhound/core/consumer/batched/BatchConsumer$.class */
public final class BatchConsumer$ {
    public static BatchConsumer$ MODULE$;

    static {
        new BatchConsumer$();
    }

    public <R> ZIO<R, Throwable, BatchConsumer<R>> make(BatchConsumerConfig batchConsumerConfig, BatchRecordHandler<R, Object, Chunk<Object>, Chunk<Object>> batchRecordHandler, Object obj) {
        return Ref$.MODULE$.make(() -> {
            return batchConsumerConfig.initialSubscription();
        }, obj).flatMap(ref -> {
            return Ref$.MODULE$.make(() -> {
                return Predef$.MODULE$.Set().empty();
            }, obj).map(ref -> {
                return new Tuple2(ref, MODULE$.trackAssignments(ref));
            }, obj).flatMap(tuple2 -> {
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                Ref ref2 = (Ref) tuple2._1();
                return Consumer$.MODULE$.make(MODULE$.consumerConfig(batchConsumerConfig, (RebalanceListener) tuple2._2(), ClassTag$.MODULE$.Nothing()), obj).flatMap(consumer -> {
                    return BatchEventLoop$.MODULE$.make(batchConsumerConfig.groupId(), batchConsumerConfig.initialSubscription(), new ReportingConsumer(batchConsumerConfig.clientId(), batchConsumerConfig.groupId(), consumer), batchRecordHandler, batchConsumerConfig.clientId(), batchConsumerConfig.retryConfig(), batchConsumerConfig.eventLoopConfig(), obj).map(batchEventLoop -> {
                        return new BatchConsumer$$anon$1(batchConsumerConfig, ref2, batchEventLoop, ref, consumer);
                    }, obj);
                }, obj);
            }, obj);
        }, obj);
    }

    private <R> ConsumerConfig consumerConfig(BatchConsumerConfig batchConsumerConfig, RebalanceListener<Object> rebalanceListener, ClassTag<R> classTag) {
        return new ConsumerConfig(batchConsumerConfig.bootstrapServers(), batchConsumerConfig.groupId(), batchConsumerConfig.clientId(), batchConsumerConfig.offsetReset(), batchConsumerConfig.extraProperties(), rebalanceListener.$times$greater(batchConsumerConfig.userProvidedListener()), batchConsumerConfig.initialOffsetsSeek(), batchConsumerConfig.consumerAttributes(), batchConsumerConfig.decryptor(), batchConsumerConfig.commitMetadataString());
    }

    private RebalanceListener<Object> trackAssignments(final Ref<Set<TopicPartition>> ref) {
        return new RebalanceListener<Object>(ref) { // from class: com.wixpress.dst.greyhound.core.consumer.batched.BatchConsumer$$anon$3
            private final Ref assignments$2;

            @Override // com.wixpress.dst.greyhound.core.consumer.RebalanceListener
            public <R1> RebalanceListener<Object> $times$greater(RebalanceListener<R1> rebalanceListener) {
                RebalanceListener<Object> $times$greater;
                $times$greater = $times$greater(rebalanceListener);
                return $times$greater;
            }

            @Override // com.wixpress.dst.greyhound.core.consumer.RebalanceListener
            public <R1> RebalanceListener<Object> provide(R1 r1, package.Tag<R1> tag, Object obj) {
                RebalanceListener<Object> provide;
                provide = provide(r1, tag, obj);
                return provide;
            }

            @Override // com.wixpress.dst.greyhound.core.consumer.RebalanceListener
            public <R1> RebalanceListener<Object> provideEnvironment(ZEnvironment<R1> zEnvironment, Object obj) {
                RebalanceListener<Object> provideEnvironment;
                provideEnvironment = provideEnvironment(zEnvironment, obj);
                return provideEnvironment;
            }

            @Override // com.wixpress.dst.greyhound.core.consumer.RebalanceListener
            public ZIO<Object, Nothing$, DelayedRebalanceEffect> onPartitionsRevoked(Consumer consumer, Set<TopicPartition> set, Object obj) {
                return this.assignments$2.update(set2 -> {
                    return set2.$minus$minus(set);
                }, obj).as(() -> {
                    return DelayedRebalanceEffect$.MODULE$.unit();
                }, obj);
            }

            @Override // com.wixpress.dst.greyhound.core.consumer.RebalanceListener
            public ZIO<Object, Nothing$, Object> onPartitionsAssigned(Consumer consumer, Set<TopicPartition> set, Object obj) {
                return this.assignments$2.update(set2 -> {
                    return set2.$plus$plus(set);
                }, obj);
            }

            {
                this.assignments$2 = ref;
                RebalanceListener.$init$(this);
            }
        };
    }

    private BatchConsumer$() {
        MODULE$ = this;
    }
}
