package com.wixpress.dst.greyhound.core.consumer;

import com.wixpress.dst.greyhound.core.CleanupPolicy;
import com.wixpress.dst.greyhound.core.TopicConfig;
import com.wixpress.dst.greyhound.core.TopicConfig$;
import com.wixpress.dst.greyhound.core.TopicPartition;
import com.wixpress.dst.greyhound.core.admin.AdminClient$;
import com.wixpress.dst.greyhound.core.admin.AdminClientConfig;
import com.wixpress.dst.greyhound.core.consumer.ConsumerMetric;
import com.wixpress.dst.greyhound.core.consumer.RecordConsumerMetric;
import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerSubscription;
import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerSubscription$TopicPattern$;
import com.wixpress.dst.greyhound.core.consumer.domain.RecordHandler;
import com.wixpress.dst.greyhound.core.consumer.retry.BlockingState;
import com.wixpress.dst.greyhound.core.consumer.retry.BlockingStateResolver;
import com.wixpress.dst.greyhound.core.consumer.retry.BlockingStateResolver$;
import com.wixpress.dst.greyhound.core.consumer.retry.BlockingTarget;
import com.wixpress.dst.greyhound.core.consumer.retry.NonBlockingRetryHelper;
import com.wixpress.dst.greyhound.core.consumer.retry.NonBlockingRetryHelper$;
import com.wixpress.dst.greyhound.core.consumer.retry.RetryConfig;
import com.wixpress.dst.greyhound.core.consumer.retry.RetryRecordHandler$;
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics;
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics$;
import com.wixpress.dst.greyhound.core.producer.Producer$;
import com.wixpress.dst.greyhound.core.producer.ProducerConfig;
import com.wixpress.dst.greyhound.core.producer.ProducerConfig$;
import com.wixpress.dst.greyhound.core.producer.ProducerRetryPolicy;
import com.wixpress.dst.greyhound.core.producer.ReportingProducer;
import com.wixpress.dst.greyhound.core.zioutils.AwaitShutdown;
import com.wixpress.dst.greyhound.core.zioutils.AwaitShutdown$;
import java.util.regex.Pattern;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.runtime.RichInt$;
import zio.Chunk;
import zio.DurationSyntax$;
import zio.IsSubtypeOfError$;
import zio.Ref;
import zio.Ref$;
import zio.Scope;
import zio.ZIO;
import zio.ZIO$;
import zio.package;

/* compiled from: RecordConsumer.scala */
/* loaded from: input_file:com/wixpress/dst/greyhound/core/consumer/RecordConsumer$.class */
public final class RecordConsumer$ {
    public static RecordConsumer$ MODULE$;

    static {
        new RecordConsumer$();
    }

    public <R, E> ZIO<R, Throwable, RecordConsumer<R>> make(RecordConsumerConfig recordConsumerConfig, RecordHandler<R, E, Chunk<Object>, Chunk<Object>> recordHandler, Object obj, package.Tag<GreyhoundMetrics.Service> tag) {
        return ZIO$.MODULE$.acquireRelease(() -> {
            return AwaitShutdown$.MODULE$.make(obj).flatMap(shutdownPromise -> {
                return GreyhoundMetrics$.MODULE$.report(new ConsumerMetric.CreatingConsumer(recordConsumerConfig.clientId(), recordConsumerConfig.group(), recordConsumerConfig.bootstrapServers(), recordConsumerConfig.consumerAttributes())).flatMap(boxedUnit -> {
                    return MODULE$.validateRetryPolicy(recordConsumerConfig).flatMap(obj2 -> {
                        return Ref$.MODULE$.make(() -> {
                            return recordConsumerConfig.initialSubscription();
                        }, obj).map(ref -> {
                            return new Tuple2(ref, NonBlockingRetryHelper$.MODULE$.apply(recordConsumerConfig.group(), recordConsumerConfig.retryConfig()));
                        }, obj).flatMap(tuple2 -> {
                            if (tuple2 == null) {
                                throw new MatchError(tuple2);
                            }
                            Ref ref2 = (Ref) tuple2._1();
                            NonBlockingRetryHelper nonBlockingRetryHelper = (NonBlockingRetryHelper) tuple2._2();
                            return Consumer$.MODULE$.make(MODULE$.consumerConfig(recordConsumerConfig), obj).map(consumer -> {
                                Tuple2 tuple2 = (Tuple2) recordConsumerConfig.retryConfig().fold(() -> {
                                    return new Tuple2(recordConsumerConfig.initialSubscription(), Predef$.MODULE$.Set().empty());
                                }, retryConfig -> {
                                    return MODULE$.maybeAddRetryTopics(retryConfig, recordConsumerConfig, nonBlockingRetryHelper, obj);
                                });
                                if (tuple2 == null) {
                                    throw new MatchError(tuple2);
                                }
                                Tuple3 tuple3 = new Tuple3(tuple2, (ConsumerSubscription) tuple2._1(), (Set) tuple2._2());
                                Tuple2 tuple22 = (Tuple2) tuple3._1();
                                return new Tuple2(consumer, tuple22);
                            }, obj).flatMap(tuple2 -> {
                                if (tuple2 != null) {
                                    Consumer consumer2 = (Consumer) tuple2._1();
                                    Tuple2 tuple2 = (Tuple2) tuple2._2();
                                    if (tuple2 != null) {
                                        ConsumerSubscription consumerSubscription = (ConsumerSubscription) tuple2._1();
                                        Set set = (Set) tuple2._2();
                                        return AdminClient$.MODULE$.make(new AdminClientConfig(recordConsumerConfig.bootstrapServers(), recordConsumerConfig.kafkaAuthProperties()), recordConsumerConfig.consumerAttributes()).tap(adminClient -> {
                                            return adminClient.createTopics((Set) set.map(str -> {
                                                return new TopicConfig(str, 1, 1, new CleanupPolicy.Delete(86400000L), TopicConfig$.MODULE$.apply$default$5());
                                            }, Set$.MODULE$.canBuildFrom()), adminClient.createTopics$default$2(), obj);
                                        }, obj).flatMap(adminClient2 -> {
                                            return Ref$.MODULE$.make(() -> {
                                                return Predef$.MODULE$.Map().empty();
                                            }, obj).map(ref3 -> {
                                                return new Tuple2(ref3, BlockingStateResolver$.MODULE$.apply(ref3));
                                            }, obj).flatMap(tuple22 -> {
                                                if (tuple22 == null) {
                                                    throw new MatchError(tuple22);
                                                }
                                                Ref ref4 = (Ref) tuple22._1();
                                                BlockingStateResolver blockingStateResolver = (BlockingStateResolver) tuple22._2();
                                                return Ref$.MODULE$.make(() -> {
                                                    return Predef$.MODULE$.Map().empty();
                                                }, obj).map(ref5 -> {
                                                    return new Tuple2(ref5, MODULE$.combineAwaitShutdowns(shutdownPromise, ref5, obj));
                                                }, obj).flatMap(tuple22 -> {
                                                    if (tuple22 == null) {
                                                        throw new MatchError(tuple22);
                                                    }
                                                    Ref ref6 = (Ref) tuple22._1();
                                                    return MODULE$.addRetriesToHandler(recordConsumerConfig, recordHandler, ref4, nonBlockingRetryHelper, (Function1) tuple22._2(), obj).flatMap(recordHandler2 -> {
                                                        String group = recordConsumerConfig.group();
                                                        ReportingConsumer reportingConsumer = new ReportingConsumer(recordConsumerConfig.clientId(), recordConsumerConfig.group(), consumer2);
                                                        EventLoopConfig eventLoopConfig = recordConsumerConfig.eventLoopConfig();
                                                        return EventLoop$.MODULE$.make(group, consumerSubscription, reportingConsumer, recordHandler2, recordConsumerConfig.clientId(), eventLoopConfig, recordConsumerConfig.consumerAttributes(), ref6, obj).map(eventLoop -> {
                                                            return new RecordConsumer$$anon$1(shutdownPromise, eventLoop, obj, consumer2, blockingStateResolver, ref4, recordConsumerConfig, ref2);
                                                        }, obj);
                                                    }, obj);
                                                }, obj);
                                            }, obj);
                                        }, obj);
                                    }
                                }
                                throw new MatchError(tuple2);
                            }, obj);
                        }, obj);
                    }, obj);
                }, obj);
            }, obj);
        }, recordConsumer -> {
            return recordConsumer.shutdown().catchAllCause(cause -> {
                return ZIO$.MODULE$.succeed(() -> {
                    cause.squashTrace(IsSubtypeOfError$.MODULE$.impl(Predef$.MODULE$.$conforms())).printStackTrace();
                }, obj);
            }, obj);
        }, obj);
    }

    private Function1<TopicPartition, ZIO<Object, Nothing$, AwaitShutdown>> combineAwaitShutdowns(AwaitShutdown.ShutdownPromise shutdownPromise, Ref<Map<TopicPartition, AwaitShutdown.ShutdownPromise>> ref, Object obj) {
        return topicPartition -> {
            return ref.get(obj).map(map -> {
                return (AwaitShutdown) map.get(topicPartition).fold(() -> {
                    return shutdownPromise.awaitShutdown();
                }, shutdownPromise2 -> {
                    return shutdownPromise2.awaitShutdown().or(shutdownPromise.awaitShutdown());
                });
            }, obj);
        };
    }

    public <E, R> ConsumerConfig consumerConfig(RecordConsumerConfig recordConsumerConfig) {
        return new ConsumerConfig(recordConsumerConfig.bootstrapServers(), recordConsumerConfig.group(), recordConsumerConfig.clientId(), recordConsumerConfig.offsetReset(), recordConsumerConfig.extraProperties(), recordConsumerConfig.userProvidedListener(), recordConsumerConfig.initialOffsetsSeek(), recordConsumerConfig.consumerAttributes(), recordConsumerConfig.decryptor(), recordConsumerConfig.commitMetadataString());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <E, R> Tuple2<ConsumerSubscription, Set<String>> maybeAddRetryTopics(RetryConfig retryConfig, RecordConsumerConfig recordConsumerConfig, NonBlockingRetryHelper nonBlockingRetryHelper, Object obj) {
        Tuple2<ConsumerSubscription, Set<String>> tuple2;
        ConsumerSubscription initialSubscription = recordConsumerConfig.initialSubscription();
        if (initialSubscription instanceof ConsumerSubscription.Topics) {
            Set<String> set = ((ConsumerSubscription.Topics) initialSubscription).topics();
            Set set2 = (Set) set.flatMap(str -> {
                return nonBlockingRetryHelper.retryTopicsFor(str);
            }, Set$.MODULE$.canBuildFrom());
            tuple2 = new Tuple2<>(new ConsumerSubscription.Topics(set.$plus$plus(set2)), set2);
        } else {
            if (!(initialSubscription instanceof ConsumerSubscription.TopicPattern)) {
                throw new MatchError(initialSubscription);
            }
            tuple2 = new Tuple2<>(new ConsumerSubscription.TopicPattern(Pattern.compile(new StringBuilder(1).append(((ConsumerSubscription.TopicPattern) initialSubscription).p().pattern()).append("|").append(NonBlockingRetryHelper$.MODULE$.retryPattern(recordConsumerConfig.group())).toString()), ConsumerSubscription$TopicPattern$.MODULE$.apply$default$2()), ((TraversableOnce) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), retryConfig.nonBlockingBackoffs("").length()).map(obj2 -> {
                return $anonfun$maybeAddRetryTopics$2(recordConsumerConfig, BoxesRunTime.unboxToInt(obj2));
            }, IndexedSeq$.MODULE$.canBuildFrom())).toSet());
        }
        return tuple2;
    }

    private <R, E> ZIO<Scope, Throwable, RecordHandler<GreyhoundMetrics.Service, Nothing$, Chunk<Object>, Chunk<Object>>> addRetriesToHandler(RecordConsumerConfig recordConsumerConfig, RecordHandler<R, E, Chunk<Object>, Chunk<Object>> recordHandler, Ref<Map<BlockingTarget, BlockingState>> ref, NonBlockingRetryHelper nonBlockingRetryHelper, Function1<TopicPartition, ZIO<Object, Nothing$, AwaitShutdown>> function1, Object obj) {
        ZIO<Scope, Throwable, RecordHandler<GreyhoundMetrics.Service, Nothing$, Chunk<Object>, Chunk<Object>>> succeed;
        Some retryConfig = recordConsumerConfig.retryConfig();
        if (retryConfig instanceof Some) {
            RetryConfig retryConfig2 = (RetryConfig) retryConfig.value();
            succeed = Producer$.MODULE$.makeR(new ProducerConfig(recordConsumerConfig.bootstrapServers(), new ProducerRetryPolicy(Integer.MAX_VALUE, DurationSyntax$.MODULE$.seconds$extension(zio.package$.MODULE$.durationInt(3))), recordConsumerConfig.kafkaAuthProperties().$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("max.request.size"), "8000000")), ProducerConfig$.MODULE$.apply$default$4()), Producer$.MODULE$.makeR$default$2(), obj).map(producerR -> {
                return new ReportingProducer(producerR, recordConsumerConfig.retryProducerAttributes());
            }, obj).map(reportingProducer -> {
                return RetryRecordHandler$.MODULE$.withRetries(recordConsumerConfig.group(), recordHandler, retryConfig2, reportingProducer, recordConsumerConfig.initialSubscription(), ref, nonBlockingRetryHelper, function1, Predef$.MODULE$.$conforms(), Predef$.MODULE$.$conforms());
            }, obj);
        } else {
            if (!None$.MODULE$.equals(retryConfig)) {
                throw new MatchError(retryConfig);
            }
            succeed = ZIO$.MODULE$.succeed(() -> {
                return recordHandler.withErrorHandler((obj2, consumerRecord) -> {
                    return GreyhoundMetrics$.MODULE$.report(new RecordConsumerMetric.UncaughtHandlerError(obj2, consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), recordConsumerConfig.group(), recordConsumerConfig.clientId()));
                });
            }, obj);
        }
        return succeed;
    }

    private ZIO<Object, ConsumerConfigFailedValidation$InvalidRetryConfigForPatternSubscription$, Object> validateRetryPolicy(RecordConsumerConfig recordConsumerConfig) {
        ZIO<Object, ConsumerConfigFailedValidation$InvalidRetryConfigForPatternSubscription$, Object> when;
        ConsumerSubscription initialSubscription = recordConsumerConfig.initialSubscription();
        if (initialSubscription instanceof ConsumerSubscription.TopicPattern) {
            when = ZIO$.MODULE$.unit();
        } else {
            if (!(initialSubscription instanceof ConsumerSubscription.Topics)) {
                throw new MatchError(initialSubscription);
            }
            when = ZIO$.MODULE$.when(() -> {
                return recordConsumerConfig.retryConfig().exists(retryConfig -> {
                    return BoxesRunTime.boxToBoolean($anonfun$validateRetryPolicy$2(retryConfig));
                });
            }, () -> {
                return ZIO$.MODULE$.fail(() -> {
                    return ConsumerConfigFailedValidation$InvalidRetryConfigForPatternSubscription$.MODULE$;
                }, "com.wixpress.dst.greyhound.core.consumer.RecordConsumer.validateRetryPolicy(RecordConsumer.scala:274)");
            }, "com.wixpress.dst.greyhound.core.consumer.RecordConsumer.validateRetryPolicy(RecordConsumer.scala:274)");
        }
        return when;
    }

    public static final /* synthetic */ String $anonfun$maybeAddRetryTopics$2(RecordConsumerConfig recordConsumerConfig, int i) {
        return NonBlockingRetryHelper$.MODULE$.patternRetryTopic(recordConsumerConfig.group(), i);
    }

    public static final /* synthetic */ boolean $anonfun$validateRetryPolicy$2(RetryConfig retryConfig) {
        return retryConfig.forPatternSubscription().exists(retryConfigForTopic -> {
            return BoxesRunTime.boxToBoolean(retryConfigForTopic.nonEmpty());
        });
    }

    private RecordConsumer$() {
        MODULE$ = this;
    }
}
