package com.commercetools.queue;

import cats.effect.kernel.GenConcurrent;
import cats.effect.kernel.Outcome;
import cats.effect.kernel.Outcome$Succeeded$;
import cats.effect.kernel.Resource;
import cats.effect.kernel.syntax.MonadCancelOps$;
import cats.effect.package$;
import cats.syntax.ApplicativeErrorOps$;
import cats.syntax.EitherIdOps$;
import cats.syntax.OptionIdOps$;
import cats.syntax.package$all$;
import com.commercetools.queue.Decision;
import fs2.Chunk;
import fs2.Chunk$;
import fs2.Pull;
import fs2.Pull$;
import fs2.RaiseThrowable$;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$InvariantOps$;
import fs2.Stream$ToPull$;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Some$;
import scala.Tuple2;
import scala.collection.IterableOnce;
import scala.collection.immutable.Map;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;
import scala.util.Either;
import scala.util.Left;
import scala.util.NotGiven$;
import scala.util.Right;

/* compiled from: QueueSubscriber.scala */
/* loaded from: input_file:com/commercetools/queue/QueueSubscriber.class */
public abstract class QueueSubscriber<F, T> {
    private final GenConcurrent<F, Throwable> F;

    public QueueSubscriber(GenConcurrent<F, Throwable> genConcurrent) {
        this.F = genConcurrent;
    }

    public abstract String queueName();

    public abstract Resource<F, QueuePuller<F, T>> puller();

    public final Stream<F, MessageContext<F, T>> messages(int i, FiniteDuration finiteDuration) {
        return Stream$.MODULE$.resource(puller(), this.F).flatMap(queuePuller -> {
            return Stream$.MODULE$.repeatEval(queuePuller.pullBatch(i, finiteDuration)).unchunks($less$colon$less$.MODULE$.refl());
        }, NotGiven$.MODULE$.value());
    }

    public final <Res> Stream<F, Res> processWithAutoAck(int i, FiniteDuration finiteDuration, Function1<Message<F, T>, F> function1) {
        return Stream$InvariantOps$.MODULE$.repeatPull$extension(Stream$.MODULE$.InvariantOps(messages(i, finiteDuration)), obj -> {
            return processWithAutoAck$$anonfun$1(function1, obj == null ? null : ((Stream.ToPull) obj).fs2$Stream$ToPull$$self());
        });
    }

    public final <Res> Stream<F, Either<Throwable, Res>> attemptProcessWithAutoAck(int i, FiniteDuration finiteDuration, Function1<Message<F, T>, F> function1) {
        return messages(i, finiteDuration).parEvalMap(i, messageContext -> {
            return package$all$.MODULE$.toFlatMapOps(ApplicativeErrorOps$.MODULE$.attempt$extension(package$all$.MODULE$.catsSyntaxApplicativeError(function1.apply(messageContext), this.F), this.F), this.F).flatTap(either -> {
                if (either instanceof Right) {
                    return messageContext.ack();
                }
                if (either instanceof Left) {
                    return messageContext.nack();
                }
                throw new MatchError(either);
            });
        }, this.F);
    }

    public final <Res> Stream<F, Either<Throwable, Res>> process(int i, FiniteDuration finiteDuration, QueuePublisher<F, T> queuePublisher, MessageHandler<F, T, Res, Decision> messageHandler) {
        return Stream$.MODULE$.resource(queuePublisher.pusher(), this.F).flatMap(queuePusher -> {
            return (Stream) package$all$.MODULE$.toFunctorFilterOps(messages(i, finiteDuration).parEvalMap(i, messageContext -> {
                return package$all$.MODULE$.toFlatMapOps(messageHandler.handle(messageContext), this.F).flatMap(decision -> {
                    if (decision instanceof Decision.Ok) {
                        Object _1 = Decision$Ok$.MODULE$.unapply((Decision.Ok) decision)._1();
                        return package$all$.MODULE$.toFunctorOps(messageContext.ack(), this.F).as(OptionIdOps$.MODULE$.some$extension((Either) package$all$.MODULE$.catsSyntaxOptionId(EitherIdOps$.MODULE$.asRight$extension(package$all$.MODULE$.catsSyntaxEitherId(_1)))));
                    }
                    if (Decision$Drop$.MODULE$.equals(decision)) {
                        return package$all$.MODULE$.toFunctorOps(messageContext.ack(), this.F).as(package$all$.MODULE$.none());
                    }
                    if (decision instanceof Decision.Fail) {
                        Decision.Fail unapply = Decision$Fail$.MODULE$.unapply((Decision.Fail) decision);
                        Throwable _12 = unapply._1();
                        boolean _2 = unapply._2();
                        if (true == _2) {
                            return package$all$.MODULE$.toFunctorOps(messageContext.ack(), this.F).as(OptionIdOps$.MODULE$.some$extension((Either) package$all$.MODULE$.catsSyntaxOptionId(EitherIdOps$.MODULE$.asLeft$extension((Throwable) package$all$.MODULE$.catsSyntaxEitherId(_12)))));
                        }
                        if (false == _2) {
                            return package$all$.MODULE$.toFunctorOps(messageContext.nack(), this.F).as(OptionIdOps$.MODULE$.some$extension((Either) package$all$.MODULE$.catsSyntaxOptionId(EitherIdOps$.MODULE$.asLeft$extension((Throwable) package$all$.MODULE$.catsSyntaxEitherId(_12)))));
                        }
                    }
                    if (!(decision instanceof Decision.Reenqueue)) {
                        throw new MatchError(decision);
                    }
                    Decision.Reenqueue unapply2 = Decision$Reenqueue$.MODULE$.unapply((Decision.Reenqueue) decision);
                    Option<Map<String, String>> _13 = unapply2._1();
                    Option<FiniteDuration> _22 = unapply2._2();
                    return package$all$.MODULE$.toFunctorOps(package$all$.MODULE$.toFlatMapOps(messageContext.payload(), this.F).flatMap(obj -> {
                        return queuePusher.push(obj, (Map) messageContext.metadata().$plus$plus((IterableOnce) _13.getOrElse(QueueSubscriber::process$$anonfun$1$$anonfun$1$$anonfun$1$$anonfun$1$$anonfun$1)), _22);
                    }), this.F).as(package$all$.MODULE$.none());
                });
            }, this.F), Stream$.MODULE$.functorFilterInstance()).flattenOption($less$colon$less$.MODULE$.refl());
        }, NotGiven$.MODULE$.value());
    }

    public final <Res> Stream<F, Either<Throwable, Res>> processWithImmediateDecision(int i, FiniteDuration finiteDuration, MessageHandler<F, T, Res, ImmediateDecision> messageHandler) {
        return process(i, finiteDuration, QueuePublisher$.MODULE$.noop(this.F), message -> {
            return package$all$.MODULE$.toFunctorOps(messageHandler.handle(message), this.F).widen();
        });
    }

    private final Pull doChunk$1$$anonfun$2$$anonfun$1(Chunk chunk, int i, Function1 function1) {
        return doChunk$1(function1, chunk, i + 1);
    }

    private final Pull doChunk$1(Function1 function1, Chunk chunk, int i) {
        if (i >= chunk.size()) {
            return Pull$.MODULE$.done();
        }
        MessageContext messageContext = (MessageContext) chunk.apply(i);
        return Pull$.MODULE$.eval(MonadCancelOps$.MODULE$.guaranteeCase$extension(cats.effect.syntax.package$all$.MODULE$.monadCancelOps(function1.apply(messageContext), this.F), outcome -> {
            if (!(outcome instanceof Outcome.Succeeded)) {
                return package$all$.MODULE$.toFoldableOps(chunk.toArraySlice(ClassTag$.MODULE$.apply(MessageContext.class)).drop(i), Chunk$.MODULE$.instance()).traverse_(messageContext2 -> {
                    return ApplicativeErrorOps$.MODULE$.attempt$extension(package$all$.MODULE$.catsSyntaxApplicativeError(messageContext2.nack(), this.F), this.F);
                }, this.F);
            }
            package$.MODULE$.Outcome();
            Outcome$Succeeded$.MODULE$.unapply((Outcome.Succeeded) outcome)._1();
            return messageContext.ack();
        }, this.F)).attempt().flatMap(either -> {
            if (either instanceof Right) {
                return Pull$.MODULE$.output1(((Right) either).value()).$greater$greater(() -> {
                    return r1.doChunk$1$$anonfun$2$$anonfun$1(r2, r3, r4);
                });
            }
            if (!(either instanceof Left)) {
                throw new MatchError(either);
            }
            return Pull$.MODULE$.raiseError((Throwable) ((Left) either).value(), RaiseThrowable$.MODULE$.fromApplicativeError(this.F));
        });
    }

    private final /* synthetic */ Pull processWithAutoAck$$anonfun$1(Function1 function1, Stream stream) {
        return Stream$ToPull$.MODULE$.uncons$extension(stream).flatMap(option -> {
            Tuple2 tuple2;
            if ((option instanceof Some) && (tuple2 = (Tuple2) ((Some) option).value()) != null) {
                return doChunk$1(function1, (Chunk) tuple2._1(), 0).as(Some$.MODULE$.apply((Stream) tuple2._2()));
            }
            if (None$.MODULE$.equals(option)) {
                return Pull$.MODULE$.pure(None$.MODULE$);
            }
            throw new MatchError(option);
        });
    }

    private static final Map process$$anonfun$1$$anonfun$1$$anonfun$1$$anonfun$1$$anonfun$1() {
        return Predef$.MODULE$.Map().empty();
    }
}
