package com.commercetools.queue.gcp.pubsub;

import cats.effect.kernel.Async;
import cats.effect.kernel.syntax.GenConcurrentOps_$;
import cats.effect.syntax.package$concurrent$;
import cats.syntax.ApplicativeErrorOps$;
import cats.syntax.MonadErrorOps$;
import cats.syntax.package$all$;
import com.commercetools.queue.Deserializer;
import com.commercetools.queue.QueuePuller;
import com.google.api.core.ApiFuture;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.httpjson.HttpJsonCallContext;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.SubscriptionName;
import fs2.Chunk$;
import java.time.Instant;
import org.threeten.bp.Duration;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.Some$;
import scala.collection.mutable.Map;
import scala.concurrent.duration.FiniteDuration;
import scala.jdk.CollectionConverters$;

/* compiled from: PubSubPuller.scala */
/* loaded from: input_file:com/commercetools/queue/gcp/pubsub/PubSubPuller.class */
public class PubSubPuller<F, T> implements QueuePuller<F, T> {
    private final String queueName;
    private final boolean useGrpc;
    private final SubscriptionName subscriptionName;
    private final SubscriberStub subscriber;
    private final int lockTTLSeconds;
    private final Async<F> F;
    private final Deserializer<T> deserializer;

    public PubSubPuller(String str, boolean z, SubscriptionName subscriptionName, SubscriberStub subscriberStub, int i, Async<F> async, Deserializer<T> deserializer) {
        this.queueName = str;
        this.useGrpc = z;
        this.subscriptionName = subscriptionName;
        this.subscriber = subscriberStub;
        this.lockTTLSeconds = i;
        this.F = async;
        this.deserializer = deserializer;
    }

    public String queueName() {
        return this.queueName;
    }

    private ApiCallContext callContext(FiniteDuration finiteDuration) {
        return this.useGrpc ? GrpcCallContext.createDefault().withRetrySettings(RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(finiteDuration.toMillis())).build()) : HttpJsonCallContext.createDefault().withRetrySettings(RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(finiteDuration.toMillis())).build());
    }

    public F pullBatch(int i, FiniteDuration finiteDuration) {
        return (F) MonadErrorOps$.MODULE$.adaptError$extension(package$all$.MODULE$.catsSyntaxMonadError(package$all$.MODULE$.toFunctorOps(package$all$.MODULE$.toFlatMapOps(package$all$.MODULE$.toFlatMapOps(ApplicativeErrorOps$.MODULE$.recover$extension(package$all$.MODULE$.catsSyntaxApplicativeError(package$all$.MODULE$.toFunctorOps(package$.MODULE$.wrapFuture(this.F.delay(() -> {
            return r8.pullBatch$$anonfun$1(r9, r10);
        }), this.F), this.F).map(pullResponse -> {
            return Chunk$.MODULE$.from(CollectionConverters$.MODULE$.ListHasAsScala(pullResponse.getReceivedMessagesList()).asScala());
        }), this.F), new PubSubPuller$$anon$1(), this.F), this.F).flatMap(chunk -> {
            return chunk.traverseFilter(receivedMessage -> {
                Map asScala = CollectionConverters$.MODULE$.MapHasAsScala(receivedMessage.getMessage().getAttributesMap()).asScala();
                return package$all$.MODULE$.toFlatMapOps(this.F.realTimeInstant(), this.F).flatMap(instant -> {
                    String str;
                    Some some = asScala.get("com.commercetools.queue.delay");
                    if ((some instanceof Some) && (str = (String) some.value()) != null) {
                        Option<Instant> unapply = package$ToInstant$.MODULE$.unapply(str);
                        if (!unapply.isEmpty()) {
                            Instant instant = (Instant) unapply.get();
                            if (instant.isAfter(instant)) {
                                return package$all$.MODULE$.toFunctorOps(package$.MODULE$.wrapFuture(this.F.delay(() -> {
                                    return r3.pullBatch$$anonfun$3$$anonfun$1$$anonfun$1$$anonfun$1(r4, r5, r6);
                                }), this.F), this.F).as(None$.MODULE$);
                            }
                        }
                    }
                    return this.F.pure(Some$.MODULE$.apply(receivedMessage));
                });
            }, this.F);
        }), this.F).flatMap(chunk2 -> {
            return chunk2.traverse(receivedMessage -> {
                return package$all$.MODULE$.toFunctorOps(GenConcurrentOps_$.MODULE$.memoize$extension(package$concurrent$.MODULE$.genConcurrentOps_(this.deserializer.deserializeF(receivedMessage.getMessage().getData().toStringUtf8(), this.F)), this.F), this.F).map(obj -> {
                    return new PubSubMessageContext(this.subscriber, this.subscriptionName, receivedMessage, this.lockTTLSeconds, obj, queueName(), this.F);
                });
            }, this.F);
        }), this.F).widen(), this.F), new PubSubPuller$$anon$2(this), this.F);
    }

    private final ApiFuture pullBatch$$anonfun$1(FiniteDuration finiteDuration, int i) {
        return this.subscriber.pullCallable().withDefaultCallContext(callContext(finiteDuration)).futureCall(PullRequest.newBuilder().setMaxMessages(i).setSubscription(this.subscriptionName.toString()).build());
    }

    private final ApiFuture pullBatch$$anonfun$3$$anonfun$1$$anonfun$1$$anonfun$1(ReceivedMessage receivedMessage, Instant instant, Instant instant2) {
        return this.subscriber.modifyAckDeadlineCallable().futureCall(ModifyAckDeadlineRequest.newBuilder().addAckIds(receivedMessage.getAckId()).setSubscription(this.subscriptionName.toString()).setAckDeadlineSeconds((int) java.time.Duration.between(instant, instant2).getSeconds()).build());
    }
}
