package com.wixpress.dst.greyhound.core.consumer;

import com.wixpress.dst.greyhound.core.CleanupPolicy;
import com.wixpress.dst.greyhound.core.TopicConfig;
import com.wixpress.dst.greyhound.core.TopicConfig$;
import com.wixpress.dst.greyhound.core.admin.AdminClient$;
import com.wixpress.dst.greyhound.core.admin.AdminClientConfig;
import com.wixpress.dst.greyhound.core.admin.AdminClientConfig$;
import com.wixpress.dst.greyhound.core.consumer.RecordConsumerMetric;
import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerSubscription;
import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerSubscription$TopicPattern$;
import com.wixpress.dst.greyhound.core.consumer.domain.RecordHandler;
import com.wixpress.dst.greyhound.core.consumer.retry.BlockingState;
import com.wixpress.dst.greyhound.core.consumer.retry.BlockingStateResolver;
import com.wixpress.dst.greyhound.core.consumer.retry.BlockingStateResolver$;
import com.wixpress.dst.greyhound.core.consumer.retry.BlockingTarget;
import com.wixpress.dst.greyhound.core.consumer.retry.NonBlockingRetryHelper;
import com.wixpress.dst.greyhound.core.consumer.retry.NonBlockingRetryHelper$;
import com.wixpress.dst.greyhound.core.consumer.retry.RetryConfig;
import com.wixpress.dst.greyhound.core.consumer.retry.RetryRecordHandler$;
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics;
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics$;
import com.wixpress.dst.greyhound.core.producer.Producer$;
import com.wixpress.dst.greyhound.core.producer.ProducerConfig;
import com.wixpress.dst.greyhound.core.producer.ProducerConfig$;
import com.wixpress.dst.greyhound.core.producer.ProducerRetryPolicy;
import com.wixpress.dst.greyhound.core.producer.ReportingProducer$;
import java.util.regex.Pattern;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.runtime.RichInt$;
import zio.Chunk;
import zio.Has;
import zio.Ref$;
import zio.ZManaged;
import zio.ZManaged$;
import zio.ZRef;
import zio.blocking.package;
import zio.clock.package;

/* compiled from: RecordConsumer.scala */
/* loaded from: input_file:com/wixpress/dst/greyhound/core/consumer/RecordConsumer$.class */
public final class RecordConsumer$ {
    public static RecordConsumer$ MODULE$;

    static {
        new RecordConsumer$();
    }

    public <R, E> ZManaged<Has<package.Clock.Service>, Throwable, RecordConsumer<Has<package.Clock.Service>>> make(RecordConsumerConfig recordConsumerConfig, RecordHandler<R, E, Chunk<Object>, Chunk<Object>> recordHandler) {
        return Ref$.MODULE$.make(recordConsumerConfig.initialSubscription()).toManaged_().map(zRef -> {
            return new Tuple2(zRef, NonBlockingRetryHelper$.MODULE$.apply(recordConsumerConfig.group(), recordConsumerConfig.retryConfig()));
        }).flatMap(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            ZRef zRef2 = (ZRef) tuple2._1();
            NonBlockingRetryHelper nonBlockingRetryHelper = (NonBlockingRetryHelper) tuple2._2();
            return Consumer$.MODULE$.make(new ConsumerConfig(recordConsumerConfig.bootstrapServers(), recordConsumerConfig.group(), recordConsumerConfig.clientId(), recordConsumerConfig.offsetReset(), recordConsumerConfig.extraProperties(), recordConsumerConfig.userProvidedListener())).map(consumer -> {
                Tuple2 tuple2 = (Tuple2) recordConsumerConfig.retryConfig().fold(() -> {
                    return new Tuple2(recordConsumerConfig.initialSubscription(), Predef$.MODULE$.Set().empty());
                }, retryConfig -> {
                    return MODULE$.maybeAddRetryTopics(recordConsumerConfig, nonBlockingRetryHelper);
                });
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                Tuple3 tuple3 = new Tuple3(tuple2, (ConsumerSubscription) tuple2._1(), (Set) tuple2._2());
                Tuple2 tuple22 = (Tuple2) tuple3._1();
                return new Tuple2(consumer, tuple22);
            }).flatMap(tuple2 -> {
                if (tuple2 != null) {
                    Consumer consumer2 = (Consumer) tuple2._1();
                    Tuple2 tuple2 = (Tuple2) tuple2._2();
                    if (tuple2 != null) {
                        ConsumerSubscription consumerSubscription = (ConsumerSubscription) tuple2._1();
                        Set set = (Set) tuple2._2();
                        return AdminClient$.MODULE$.make(new AdminClientConfig(recordConsumerConfig.bootstrapServers(), AdminClientConfig$.MODULE$.apply$default$2())).use(adminClient -> {
                            return adminClient.createTopics((Set) set.map(str -> {
                                return new TopicConfig(str, 1, 1, new CleanupPolicy.Delete(86400000L), TopicConfig$.MODULE$.apply$default$5());
                            }, Set$.MODULE$.canBuildFrom()), adminClient.createTopics$default$2());
                        }).toManaged_().flatMap(map -> {
                            return Ref$.MODULE$.make(Predef$.MODULE$.Map().empty()).toManaged_().map(zRef3 -> {
                                return new Tuple2(zRef3, BlockingStateResolver$.MODULE$.apply(zRef3));
                            }).flatMap(tuple22 -> {
                                if (tuple22 == null) {
                                    throw new MatchError(tuple22);
                                }
                                ZRef<Nothing$, Nothing$, Map<BlockingTarget, BlockingState>, Map<BlockingTarget, BlockingState>> zRef4 = (ZRef) tuple22._1();
                                BlockingStateResolver blockingStateResolver = (BlockingStateResolver) tuple22._2();
                                return MODULE$.addRetriesToHandler(recordConsumerConfig, recordHandler, zRef4, nonBlockingRetryHelper).flatMap(recordHandler2 -> {
                                    String group = recordConsumerConfig.group();
                                    ReportingConsumer reportingConsumer = new ReportingConsumer(recordConsumerConfig.clientId(), recordConsumerConfig.group(), consumer2);
                                    EventLoopConfig eventLoopConfig = recordConsumerConfig.eventLoopConfig();
                                    return EventLoop$.MODULE$.make(group, consumerSubscription, reportingConsumer, recordHandler2, recordConsumerConfig.clientId(), eventLoopConfig).map(eventLoop -> {
                                        return new RecordConsumer$$anon$1(eventLoop, blockingStateResolver, zRef4, recordConsumerConfig, zRef2, consumer2);
                                    });
                                });
                            });
                        });
                    }
                }
                throw new MatchError(tuple2);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <E, R> Tuple2<ConsumerSubscription, Set<String>> maybeAddRetryTopics(RecordConsumerConfig recordConsumerConfig, NonBlockingRetryHelper nonBlockingRetryHelper) {
        Tuple2<ConsumerSubscription, Set<String>> tuple2;
        ConsumerSubscription initialSubscription = recordConsumerConfig.initialSubscription();
        if (initialSubscription instanceof ConsumerSubscription.Topics) {
            Set<String> set = ((ConsumerSubscription.Topics) initialSubscription).topics();
            Set set2 = (Set) set.flatMap(str -> {
                return nonBlockingRetryHelper.retryTopicsFor(str);
            }, Set$.MODULE$.canBuildFrom());
            tuple2 = new Tuple2<>(new ConsumerSubscription.Topics(set.$plus$plus(set2)), set2);
        } else {
            if (!(initialSubscription instanceof ConsumerSubscription.TopicPattern)) {
                throw new MatchError(initialSubscription);
            }
            tuple2 = new Tuple2<>(new ConsumerSubscription.TopicPattern(Pattern.compile(new StringBuilder(1).append(((ConsumerSubscription.TopicPattern) initialSubscription).p().pattern()).append("|").append(NonBlockingRetryHelper$.MODULE$.retryPattern(recordConsumerConfig.group())).toString()), ConsumerSubscription$TopicPattern$.MODULE$.apply$default$2()), ((TraversableOnce) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), nonBlockingRetryHelper.retrySteps()).map(obj -> {
                return $anonfun$maybeAddRetryTopics$2(recordConsumerConfig, BoxesRunTime.unboxToInt(obj));
            }, IndexedSeq$.MODULE$.canBuildFrom())).toSet());
        }
        return tuple2;
    }

    private <R, E> ZManaged<Has<package.Blocking.Service>, Throwable, RecordHandler<Has<GreyhoundMetrics.Service>, Nothing$, Chunk<Object>, Chunk<Object>>> addRetriesToHandler(RecordConsumerConfig recordConsumerConfig, RecordHandler<R, E, Chunk<Object>, Chunk<Object>> recordHandler, ZRef<Nothing$, Nothing$, Map<BlockingTarget, BlockingState>, Map<BlockingTarget, BlockingState>> zRef, NonBlockingRetryHelper nonBlockingRetryHelper) {
        ZManaged<Has<package.Blocking.Service>, Throwable, RecordHandler<Has<GreyhoundMetrics.Service>, Nothing$, Chunk<Object>, Chunk<Object>>> succeed;
        Some retryConfig = recordConsumerConfig.retryConfig();
        if (retryConfig instanceof Some) {
            RetryConfig retryConfig2 = (RetryConfig) retryConfig.value();
            succeed = Producer$.MODULE$.makeR(new ProducerConfig(recordConsumerConfig.bootstrapServers(), new ProducerRetryPolicy(Integer.MAX_VALUE, zio.duration.package$.MODULE$.durationInt(3).seconds()), ProducerConfig$.MODULE$.apply$default$3())).map(producerR -> {
                return ReportingProducer$.MODULE$.apply(producerR);
            }).map(reportingProducer -> {
                return RetryRecordHandler$.MODULE$.withRetries(recordHandler, retryConfig2, reportingProducer, recordConsumerConfig.initialSubscription(), zRef, nonBlockingRetryHelper, Predef$.MODULE$.$conforms(), Predef$.MODULE$.$conforms());
            });
        } else {
            if (!None$.MODULE$.equals(retryConfig)) {
                throw new MatchError(retryConfig);
            }
            succeed = ZManaged$.MODULE$.succeed(() -> {
                return recordHandler.withErrorHandler((obj, consumerRecord) -> {
                    return GreyhoundMetrics$.MODULE$.report(new RecordConsumerMetric.UncaughtHandlerError(obj, consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), recordConsumerConfig.group(), recordConsumerConfig.clientId()));
                });
            });
        }
        return succeed;
    }

    public static final /* synthetic */ String $anonfun$maybeAddRetryTopics$2(RecordConsumerConfig recordConsumerConfig, int i) {
        return NonBlockingRetryHelper$.MODULE$.patternRetryTopic(recordConsumerConfig.group(), i);
    }

    private RecordConsumer$() {
        MODULE$ = this;
    }
}
