package tamer.kafka;

import eu.timepit.refined.api.RefType$;
import eu.timepit.refined.api.Refined;
import eu.timepit.refined.auto$;
import java.time.Duration;
import log.effect.LogWriter;
import log.effect.LogWriter$;
import log.effect.LogWriterOps$;
import org.apache.kafka.clients.producer.ProducerRecord;
import scala.$less$colon$less$;
import scala.Function2;
import scala.MatchError;
import scala.PartialFunction;
import scala.Tuple2;
import scala.runtime.BoxedUnit;
import scala.runtime.Nothing$;
import tamer.SourceConfiguration;
import tamer.TamerError;
import tamer.config.Config;
import tamer.kafka.Kafka;
import zio.CanFail$;
import zio.Chunk;
import zio.Has;
import zio.Schedule;
import zio.Schedule$;
import zio.ZIO;
import zio.ZLayer;
import zio.ZQueue;
import zio.blocking.package;
import zio.kafka.producer.package;
import zio.stream.ZStream;

/* compiled from: Kafka.scala */
/* loaded from: input_file:tamer/kafka/Kafka$.class */
public final class Kafka$ {
    public static final Kafka$ MODULE$ = new Kafka$();
    private static final Schedule<Object, Object, Tuple2<Object, Duration>> tamer$kafka$Kafka$$tenTimes = Schedule$.MODULE$.recurs(10).$amp$amp(Schedule$.MODULE$.exponential(zio.duration.package$.MODULE$.durationInt(100).milliseconds(), Schedule$.MODULE$.exponential$default$2()));
    public static final PartialFunction<Throwable, TamerError> tamer$kafka$Kafka$$tamerErrors = new Kafka$$anonfun$1();

    public Schedule<Object, Object, Tuple2<Object, Duration>> tamer$kafka$Kafka$$tenTimes() {
        return tamer$kafka$Kafka$$tenTimes;
    }

    public final <R, K, V> ZIO<Has<package.Blocking.Service>, Throwable, BoxedUnit> sink(ZStream<Object, Nothing$, Tuple2<K, V>> zStream, package.Producer.Service<R, K, V> service, String str, LogWriter<ZIO> logWriter) {
        return zStream.map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return new ProducerRecord((String) auto$.MODULE$.autoUnwrap(new Refined(str), RefType$.MODULE$.refinedRefType()), tuple2._1(), tuple2._2());
        }).mapChunksM(chunk -> {
            return service.produceChunkAsync(chunk).tapError(th -> {
                return (ZIO) LogWriterOps$.MODULE$.info$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                    return new StringBuilder(33).append("Still cannot produce next chunk, ").append(chunk).toString();
                });
            }, CanFail$.MODULE$.canFail()).retry(MODULE$.tamer$kafka$Kafka$$tenTimes(), CanFail$.MODULE$.canFail()).flatten($less$colon$less$.MODULE$.refl()).$less$times(() -> {
                return (ZIO) LogWriterOps$.MODULE$.info$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                    return new StringBuilder(20).append("pushed ").append(chunk.size()).append(" messages to ").append(new Refined(str)).toString();
                });
            });
        }).runDrain().onError(cause -> {
            return ((ZIO) LogWriterOps$.MODULE$.warn$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                return new StringBuilder(33).append("Could not push data to topic '").append(new Refined(str)).append("': ").append(cause.prettyPrint()).toString();
            })).orDie($less$colon$less$.MODULE$.refl(), CanFail$.MODULE$.canFail());
        });
    }

    public <R, K, V, S> ZLayer<Has<Config.Kafka>, TamerError, Has<Kafka.Service>> live(SourceConfiguration<K, V, S> sourceConfiguration, Function2<S, ZQueue<Object, Object, Nothing$, Nothing$, Chunk<Tuple2<K, V>>, Chunk<Tuple2<K, V>>>, ZIO<R, TamerError, S>> function2) {
        return Kafka$Live$.MODULE$.getLayer(sourceConfiguration, function2);
    }

    private Kafka$() {
    }
}
