package fs2.kafka;

import cats.Apply;
import cats.effect.ConcurrentEffect;
import cats.effect.ContextShift;
import cats.effect.IO$;
import cats.effect.Resource;
import cats.effect.Resource$;
import cats.effect.Sync;
import cats.effect.concurrent.Deferred;
import cats.effect.concurrent.Deferred$;
import cats.implicits$;
import cats.syntax.MonadErrorRethrowOps$;
import fs2.Chunk$;
import fs2.kafka.internal.WithProducer;
import fs2.kafka.internal.WithProducer$;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaProducer.scala */
/* loaded from: input_file:fs2/kafka/KafkaProducer$.class */
public final class KafkaProducer$ {
    public static KafkaProducer$ MODULE$;

    static {
        new KafkaProducer$();
    }

    public <F, K, V> Resource<F, KafkaProducer<F, K, V>> resource(ProducerSettings<F, K, V> producerSettings, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift) {
        return Resource$.MODULE$.liftF(producerSettings.keySerializer(), concurrentEffect).flatMap(serializer -> {
            return Resource$.MODULE$.liftF(producerSettings.valueSerializer(), concurrentEffect).flatMap(serializer -> {
                return WithProducer$.MODULE$.apply(producerSettings, (Sync) concurrentEffect, contextShift).map(withProducer -> {
                    return new KafkaProducer<F, K, V>(withProducer, serializer, serializer, concurrentEffect) { // from class: fs2.kafka.KafkaProducer$$anon$1
                        private final WithProducer withProducer$1;
                        private final Serializer keySerializer$1;
                        private final Serializer valueSerializer$1;
                        private final ConcurrentEffect F$1;

                        @Override // fs2.kafka.KafkaProducer
                        public <P> F produce(ProducerRecords<K, V, P> producerRecords) {
                            return (F) this.withProducer$1.apply(producer -> {
                                return implicits$.MODULE$.toFunctorOps(implicits$.MODULE$.toTraverseOps(producerRecords.records(), Chunk$.MODULE$.instance()).traverse(KafkaProducer$.MODULE$.produceRecord(this.keySerializer$1, this.valueSerializer$1, producer, this.F$1), this.F$1), this.F$1).map(chunk -> {
                                    return implicits$.MODULE$.toFunctorOps(implicits$.MODULE$.toTraverseOps(chunk, Chunk$.MODULE$.instance()).sequence(Predef$.MODULE$.$conforms(), this.F$1), this.F$1).map(chunk -> {
                                        return ProducerResult$.MODULE$.apply(chunk, producerRecords.passthrough());
                                    });
                                });
                            });
                        }

                        public String toString() {
                            return new StringBuilder(14).append("KafkaProducer$").append(System.identityHashCode(this)).toString();
                        }

                        {
                            this.withProducer$1 = withProducer;
                            this.keySerializer$1 = serializer;
                            this.valueSerializer$1 = serializer;
                            this.F$1 = concurrentEffect;
                        }
                    };
                }, concurrentEffect);
            });
        });
    }

    public <F, K, V> Function1<ProducerRecord<K, V>, F> produceRecord(Serializer<F, K> serializer, Serializer<F, V> serializer2, Producer<byte[], byte[]> producer, ConcurrentEffect<F> concurrentEffect) {
        return producerRecord -> {
            return implicits$.MODULE$.toFlatMapOps(MODULE$.asJavaRecord(serializer, serializer2, producerRecord, concurrentEffect), concurrentEffect).flatMap(producerRecord -> {
                return implicits$.MODULE$.toFlatMapOps(Deferred$.MODULE$.apply(concurrentEffect), concurrentEffect).flatMap(deferred -> {
                    return implicits$.MODULE$.toFunctorOps(concurrentEffect.delay(() -> {
                        return producer.send(producerRecord, MODULE$.callback((recordMetadata, exc) -> {
                            $anonfun$produceRecord$5(deferred, producerRecord, concurrentEffect, recordMetadata, exc);
                            return BoxedUnit.UNIT;
                        }));
                    }), concurrentEffect).as(MonadErrorRethrowOps$.MODULE$.rethrow$extension(implicits$.MODULE$.catsSyntaxMonadErrorRethrow(deferred.get(), concurrentEffect), concurrentEffect));
                });
            });
        };
    }

    private <F, K, V> F serializeToBytes(Serializer<F, K> serializer, Serializer<F, V> serializer2, ProducerRecord<K, V> producerRecord, Apply<F> apply) {
        F serialize = serializer.serialize(producerRecord.topic(), producerRecord.headers(), producerRecord.key());
        return (F) implicits$.MODULE$.catsSyntaxSemigroupal(serialize, apply).product(serializer2.serialize(producerRecord.topic(), producerRecord.headers(), producerRecord.value()));
    }

    private <F, K, V> F asJavaRecord(Serializer<F, K> serializer, Serializer<F, V> serializer2, ProducerRecord<K, V> producerRecord, Apply<F> apply) {
        return (F) implicits$.MODULE$.toFunctorOps(serializeToBytes(serializer, serializer2, producerRecord, apply), apply).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return new org.apache.kafka.clients.producer.ProducerRecord(producerRecord.topic(), producerRecord.partition().isDefined() ? Predef$.MODULE$.int2Integer(BoxesRunTime.unboxToInt(producerRecord.partition().get())) : null, producerRecord.timestamp().isDefined() ? Predef$.MODULE$.long2Long(BoxesRunTime.unboxToLong(producerRecord.timestamp().get())) : null, (byte[]) tuple2._1(), (byte[]) tuple2._2(), producerRecord.headers().asJava());
        });
    }

    private Callback callback(final Function2<RecordMetadata, Exception, BoxedUnit> function2) {
        return new Callback(function2) { // from class: fs2.kafka.KafkaProducer$$anon$2
            private final Function2 f$1;

            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                this.f$1.apply(recordMetadata, exc);
            }

            {
                this.f$1 = function2;
            }
        };
    }

    public static final /* synthetic */ void $anonfun$produceRecord$5(Deferred deferred, ProducerRecord producerRecord, ConcurrentEffect concurrentEffect, RecordMetadata recordMetadata, Exception exc) {
        concurrentEffect.runAsync(deferred.complete(exc == null ? scala.package$.MODULE$.Right().apply(new Tuple2(producerRecord, recordMetadata)) : scala.package$.MODULE$.Left().apply(exc)), either -> {
            return IO$.MODULE$.unit();
        }).unsafeRunSync();
    }

    private KafkaProducer$() {
        MODULE$ = this;
    }
}
