package fs2.kafka.internal;

import cats.effect.kernel.Async;
import cats.effect.kernel.Resource;
import cats.effect.package$;
import cats.effect.std.Semaphore;
import cats.effect.std.Semaphore$;
import cats.implicits$;
import fs2.kafka.ConsumerSettings;
import org.apache.kafka.clients.consumer.Consumer;
import scala.Function2;
import scala.Tuple2;
import scala.runtime.BoxedUnit;

/* compiled from: WithConsumer.scala */
/* loaded from: input_file:fs2/kafka/internal/WithConsumer$.class */
public final class WithConsumer$ {
    public static WithConsumer$ MODULE$;

    static {
        new WithConsumer$();
    }

    public <F, K, V> Resource<F, WithConsumer<F>> apply(ConsumerSettings<F, K, V> consumerSettings, Async<F> async) {
        return package$.MODULE$.Resource().make(implicits$.MODULE$.catsSyntaxTuple2Semigroupal(new Tuple2(consumerSettings.createConsumer(), Semaphore$.MODULE$.apply(1L, async))).mapN((consumer, semaphore) -> {
            return new WithConsumer<F>(semaphore, consumer, async) { // from class: fs2.kafka.internal.WithConsumer$$anon$1
                private final Semaphore semaphore$1;
                private final Consumer consumer$1;
                private final Async F$1;

                @Override // fs2.kafka.internal.WithConsumer
                public <A> F apply(Function2<Consumer<byte[], byte[]>, Blocking<F>, F> function2) {
                    return (F) this.semaphore$1.permit().use(boxedUnit -> {
                        return function2.apply(this.consumer$1, Blocking$.MODULE$.apply(this.F$1));
                    }, this.F$1);
                }

                {
                    this.semaphore$1 = semaphore;
                    this.consumer$1 = consumer;
                    this.F$1 = async;
                }
            };
        }, async, async), withConsumer -> {
            return withConsumer.blocking(consumer2 -> {
                $anonfun$apply$4(consumerSettings, consumer2);
                return BoxedUnit.UNIT;
            });
        }, async);
    }

    public static final /* synthetic */ void $anonfun$apply$4(ConsumerSettings consumerSettings, Consumer consumer) {
        consumer.close(syntax$FiniteDurationSyntax$.MODULE$.asJava$extension(syntax$.MODULE$.FiniteDurationSyntax(consumerSettings.closeTimeout())));
    }

    private WithConsumer$() {
        MODULE$ = this;
    }
}
