package fs2.kafka;

import cats.Apply;
import cats.effect.kernel.Async;
import cats.effect.kernel.GenConcurrent;
import cats.effect.kernel.Resource;
import cats.syntax.package$all$;
import fs2.Chunk$;
import fs2.Stream;
import fs2.Stream$;
import fs2.compat.NotGiven$;
import fs2.kafka.KafkaProducer;
import fs2.kafka.internal.Blocking;
import fs2.kafka.internal.WithProducer;
import fs2.kafka.producer.MkProducer;
import org.apache.kafka.clients.producer.Producer;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.concurrent.Promise$;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaProducer.scala */
/* loaded from: input_file:fs2/kafka/KafkaProducer$.class */
public final class KafkaProducer$ {
    public static KafkaProducer$ MODULE$;

    static {
        new KafkaProducer$();
    }

    public <F, K, V> KafkaProducer<F, K, V> ProducerOps(KafkaProducer<F, K, V> kafkaProducer) {
        return kafkaProducer;
    }

    public <F, K, V> Resource<F, KafkaProducer.PartitionsFor<F, K, V>> resource(ProducerSettings<F, K, V> producerSettings, Async<F> async, MkProducer<F> mkProducer) {
        return KafkaProducerConnection$.MODULE$.resource(producerSettings, async, mkProducer).evalMap(kafkaProducerConnection -> {
            return kafkaProducerConnection.withSerializersFrom(producerSettings);
        });
    }

    public <F, K, V> KafkaProducer.PartitionsFor<F, K, V> from(final KafkaProducerConnection<F> kafkaProducerConnection, final Serializer<F, K> serializer, final Serializer<F, V> serializer2) {
        return new KafkaProducer.PartitionsFor<F, K, V>(kafkaProducerConnection, serializer, serializer2) { // from class: fs2.kafka.KafkaProducer$$anon$1
            private final KafkaProducerConnection connection$1;
            private final Serializer keySerializer$1;
            private final Serializer valueSerializer$1;

            @Override // fs2.kafka.KafkaProducer
            public <P> F produce(ProducerRecords<P, K, V> producerRecords) {
                return (F) this.connection$1.produce(producerRecords, this.keySerializer$1, this.valueSerializer$1);
            }

            @Override // fs2.kafka.KafkaProducer.Metrics
            public F metrics() {
                return (F) this.connection$1.metrics();
            }

            public String toString() {
                return new StringBuilder(14).append("KafkaProducer$").append(System.identityHashCode(this)).toString();
            }

            @Override // fs2.kafka.KafkaProducer.PartitionsFor
            public F partitionsFor(String str) {
                return (F) this.connection$1.partitionsFor(str);
            }

            {
                this.connection$1 = kafkaProducerConnection;
                this.keySerializer$1 = serializer;
                this.valueSerializer$1 = serializer2;
            }
        };
    }

    public <F, K, V> Stream<F, KafkaProducer.PartitionsFor<F, K, V>> stream(ProducerSettings<F, K, V> producerSettings, Async<F> async, MkProducer<F> mkProducer) {
        return Stream$.MODULE$.resource(resource(producerSettings, async, mkProducer), async);
    }

    public <F, P, K, V> F produce(WithProducer<F> withProducer, Serializer<F, K> serializer, Serializer<F, V> serializer2, ProducerRecords<P, K, V> producerRecords, Async<F> async) {
        return withProducer.apply((producer, blocking) -> {
            return package$all$.MODULE$.toFunctorOps(producerRecords.records().traverse(MODULE$.produceRecord(serializer, serializer2, producer, blocking, async), async), async).map(chunk -> {
                return package$all$.MODULE$.toFunctorOps(package$all$.MODULE$.toTraverseOps(chunk, Chunk$.MODULE$.instance()).sequence(Predef$.MODULE$.$conforms(), async), async).map(chunk -> {
                    return ProducerResult$.MODULE$.apply(chunk, producerRecords.passthrough());
                });
            });
        });
    }

    public <F, K, V> Function1<ProducerRecord<K, V>, F> produceRecord(Serializer<F, K> serializer, Serializer<F, V> serializer2, Producer<byte[], byte[]> producer, Blocking<F> blocking, Async<F> async) {
        return producerRecord -> {
            return package$all$.MODULE$.toFlatMapOps(MODULE$.asJavaRecord(serializer, serializer2, producerRecord, async), async).flatMap(producerRecord -> {
                return package$all$.MODULE$.toFlatMapOps(async.delay(() -> {
                    return Promise$.MODULE$.apply();
                }), async).flatMap(promise -> {
                    return package$all$.MODULE$.toFunctorOps(blocking.apply(() -> {
                        return producer.send(producerRecord, (recordMetadata, exc) -> {
                            if (exc == null) {
                                promise.success(new Tuple2(producerRecord, recordMetadata));
                            } else {
                                promise.failure(exc);
                            }
                        });
                    }), async).as(async.fromFuture(async.delay(() -> {
                        return promise.future();
                    })));
                });
            });
        };
    }

    public <F, K, V, P> Function1<Stream<F, ProducerRecords<P, K, V>>, Stream<F, ProducerResult<P, K, V>>> pipe(ProducerSettings<F, K, V> producerSettings, Async<F> async, MkProducer<F> mkProducer) {
        return stream -> {
            return MODULE$.stream(producerSettings, async, mkProducer).flatMap(partitionsFor -> {
                return (Stream) MODULE$.pipe(producerSettings, (KafkaProducer) partitionsFor, (GenConcurrent) async).apply(stream);
            }, NotGiven$.MODULE$.default());
        };
    }

    public <F, K, V, P> Function1<Stream<F, ProducerRecords<P, K, V>>, Stream<F, ProducerResult<P, K, V>>> pipe(ProducerSettings<F, K, V> producerSettings, KafkaProducer<F, K, V> kafkaProducer, GenConcurrent<F, Throwable> genConcurrent) {
        return stream -> {
            return stream.evalMap(producerRecords -> {
                return kafkaProducer.produce(producerRecords);
            }).mapAsync(producerSettings.parallelism(), obj -> {
                return Predef$.MODULE$.identity(obj);
            }, genConcurrent);
        };
    }

    private <F, K, V> F serializeToBytes(Serializer<F, K> serializer, Serializer<F, V> serializer2, ProducerRecord<K, V> producerRecord, Apply<F> apply) {
        F serialize = serializer.serialize(producerRecord.topic(), producerRecord.headers(), producerRecord.key());
        return (F) package$all$.MODULE$.catsSyntaxSemigroupal(serialize, apply).product(serializer2.serialize(producerRecord.topic(), producerRecord.headers(), producerRecord.value()));
    }

    private <F, K, V> F asJavaRecord(Serializer<F, K> serializer, Serializer<F, V> serializer2, ProducerRecord<K, V> producerRecord, Apply<F> apply) {
        return (F) package$all$.MODULE$.toFunctorOps(serializeToBytes(serializer, serializer2, producerRecord, apply), apply).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return new org.apache.kafka.clients.producer.ProducerRecord(producerRecord.topic(), (Integer) producerRecord.partition().fold(() -> {
                return null;
            }, obj -> {
                return $anonfun$asJavaRecord$3(BoxesRunTime.unboxToInt(obj));
            }), (Long) producerRecord.timestamp().fold(() -> {
                return null;
            }, obj2 -> {
                return $anonfun$asJavaRecord$5(BoxesRunTime.unboxToLong(obj2));
            }), (byte[]) tuple2._1(), (byte[]) tuple2._2(), producerRecord.headers().asJava());
        });
    }

    public <F> boolean apply() {
        return KafkaProducer$ProducerPartiallyApplied$.MODULE$.$lessinit$greater$default$1();
    }

    private <F> MkProducer<F> mkAmbig1() {
        throw new AssertionError("should not be used");
    }

    private <F> MkProducer<F> mkAmbig2() {
        throw new AssertionError("should not be used");
    }

    public static final /* synthetic */ Integer $anonfun$asJavaRecord$3(int i) {
        return Predef$.MODULE$.int2Integer(BoxesRunTime.unboxToInt(Predef$.MODULE$.identity(BoxesRunTime.boxToInteger(i))));
    }

    public static final /* synthetic */ Long $anonfun$asJavaRecord$5(long j) {
        return Predef$.MODULE$.long2Long(BoxesRunTime.unboxToLong(Predef$.MODULE$.identity(BoxesRunTime.boxToLong(j))));
    }

    private KafkaProducer$() {
        MODULE$ = this;
    }
}
