package rhttpc.transport.amqp;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.pattern.AskTimeoutException;
import akka.pattern.AskableActorRef$;
import akka.util.Timeout;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rhttpc.transport.Deserializer;
import rhttpc.transport.RejectingMessage;
import rhttpc.transport.Subscriber;
import rhttpc.utils.Agent;
import rhttpc.utils.Agent$;
import rhttpc.utils.Recovered$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: AmqpSubscriber.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0005fAB\u000b\u0017\u0003\u00031B\u0004\u0003\u00055\u0001\t\u0005\t\u0015!\u00036\u0011!y\u0004A!A!\u0002\u0013\u0001\u0005\u0002C&\u0001\u0005\u0003\u0005\u000b\u0011\u0002'\t\u0011Q\u0003!\u0011!Q\u0001\nUC\u0001\u0002\u0017\u0001\u0003\u0002\u0003\u0006I!\u0017\u0005\tC\u0002\u0011\t\u0011)A\u00053\"A!\r\u0001B\u0001B\u0003-1\rC\u0003g\u0001\u0011\u0005q\r\u0003\u0005s\u0001!\u0015\r\u0011\"\u0003t\u0011\u001da\bA1A\u0005\nuDq!!\b\u0001A\u0003%a\u0010C\u0005\u0002 \u0001\u0001\r\u0011\"\u0003\u0002\"!I\u0011\u0011\u0006\u0001A\u0002\u0013%\u00111\u0006\u0005\t\u0003c\u0001\u0001\u0015)\u0003\u0002$!9\u00111\b\u0001\u0005B\u0005u\u0002bBA \u0001\u0019E\u0011\u0011\t\u0005\b\u00033\u0002A\u0011BA.\u0011\u001d\tY\u0007\u0001C\u0005\u0003[Bq!a%\u0001\t\u0003\n)\nC\u0004\u0002\u001e\u0002!I!a(\u0003\u001d\u0005k\u0017\u000f]*vEN\u001c'/\u001b2fe*\u0011q\u0003G\u0001\u0005C6\f\bO\u0003\u0002\u001a5\u0005IAO]1ogB|'\u000f\u001e\u0006\u00027\u00051!\u000f\u001b;ua\u000e,\"!\b\u0016\u0014\u0007\u0001qB\u0005\u0005\u0002 E5\t\u0001EC\u0001\"\u0003\u0015\u00198-\u00197b\u0013\t\u0019\u0003E\u0001\u0004B]f\u0014VM\u001a\t\u0004K\u0019BS\"\u0001\r\n\u0005\u001dB\"AC*vEN\u001c'/\u001b2feB\u0011\u0011F\u000b\u0007\u0001\t\u0015Y\u0003A1\u0001.\u0005\r\u0019VOY\u0002\u0001#\tq\u0013\u0007\u0005\u0002 _%\u0011\u0001\u0007\t\u0002\b\u001d>$\b.\u001b8h!\ty\"'\u0003\u00024A\t\u0019\u0011I\\=\u0002\u000f\rD\u0017M\u001c8fYB\u0011a'P\u0007\u0002o)\u0011\u0001(O\u0001\u0007G2LWM\u001c;\u000b\u0005iZ\u0014\u0001\u0003:bE\nLG/\\9\u000b\u0003q\n1aY8n\u0013\tqtGA\u0004DQ\u0006tg.\u001a7\u0002\u0013E,X-^3OC6,\u0007CA!I\u001d\t\u0011e\t\u0005\u0002DA5\tAI\u0003\u0002FY\u00051AH]8pizJ!a\u0012\u0011\u0002\rA\u0013X\rZ3g\u0013\tI%J\u0001\u0004TiJLgn\u001a\u0006\u0003\u000f\u0002\n\u0001bY8ogVlWM\u001d\t\u0003\u001bJk\u0011A\u0014\u0006\u0003\u001fB\u000bQ!Y2u_JT\u0011!U\u0001\u0005C.\\\u0017-\u0003\u0002T\u001d\nA\u0011i\u0019;peJ+g-\u0001\u0007eKN,'/[1mSj,'\u000fE\u0002&-\"J!a\u0016\r\u0003\u0019\u0011+7/\u001a:jC2L'0\u001a:\u0002\u001d\r|gn];nKRKW.Z8viB\u0011!lX\u0007\u00027*\u0011A,X\u0001\tIV\u0014\u0018\r^5p]*\u0011a\fI\u0001\u000bG>t7-\u001e:sK:$\u0018B\u00011\\\u000591\u0015N\\5uK\u0012+(/\u0019;j_:\f\u0011B\\1dW\u0012+G.Y=\u0002\rML8\u000f^3n!\tiE-\u0003\u0002f\u001d\nY\u0011i\u0019;peNK8\u000f^3n\u0003\u0019a\u0014N\\5u}Q9\u0001\u000e\\7o_B\fHCA5l!\rQ\u0007\u0001K\u0007\u0002-!)!\r\u0003a\u0002G\")A\u0007\u0003a\u0001k!)q\b\u0003a\u0001\u0001\")1\n\u0003a\u0001\u0019\")A\u000b\u0003a\u0001+\")\u0001\f\u0003a\u00013\")\u0011\r\u0003a\u00013\u00061An\\4hKJ,\u0012\u0001\u001e\t\u0003kjl\u0011A\u001e\u0006\u0003ob\fQa\u001d7gi)T\u0011!_\u0001\u0004_J<\u0017BA>w\u0005\u0019aunZ4fe\u00061\u0002/\u001a8eS:<7i\u001c8tk6,\u0007K]8nSN,7/F\u0001\u007f!\u0015y\u0018QAA\u0005\u001b\t\t\tAC\u0002\u0002\u0004i\tQ!\u001e;jYNLA!a\u0002\u0002\u0002\t)\u0011iZ3oiB)\u0011)a\u0003\u0002\u0010%\u0019\u0011Q\u0002&\u0003\u0007M+G\u000f\u0005\u0004\u0002\u0012\u0005M\u0011qC\u0007\u0002;&\u0019\u0011QC/\u0003\u000fA\u0013x.\\5tKB\u0019q$!\u0007\n\u0007\u0005m\u0001E\u0001\u0003V]&$\u0018a\u00069f]\u0012LgnZ\"p]N,X.\u001a)s_6L7/Z:!\u0003-\u0019wN\\:v[\u0016\u0014H+Y4\u0016\u0005\u0005\r\u0002\u0003B\u0010\u0002&\u0001K1!a\n!\u0005\u0019y\u0005\u000f^5p]\u0006y1m\u001c8tk6,'\u000fV1h?\u0012*\u0017\u000f\u0006\u0003\u0002\u0018\u00055\u0002\"CA\u0018\u001b\u0005\u0005\t\u0019AA\u0012\u0003\rAH%M\u0001\rG>t7/^7feR\u000bw\r\t\u0015\u0004\u001d\u0005U\u0002cA\u0010\u00028%\u0019\u0011\u0011\b\u0011\u0003\u0011Y|G.\u0019;jY\u0016\fQa\u001d;beR$\"!a\u0006\u0002\u001dA\u0014X\r]1sK6+7o]1hKR)\u0011'a\u0011\u0002H!1\u0011Q\t\tA\u0002!\n1\u0003Z3tKJL\u0017\r\\5{K\u0012lUm]:bO\u0016Dq!!\u0013\u0011\u0001\u0004\tY%\u0001\u0006qe>\u0004XM\u001d;jKN\u0004B!!\u0014\u0002T9\u0019a'a\u0014\n\u0007\u0005Es'\u0001\u0003B\u001bF\u0003\u0016\u0002BA+\u0003/\u0012qBQ1tS\u000e\u0004&o\u001c9feRLWm\u001d\u0006\u0004\u0003#:\u0014!\u00075b]\u0012dW\rR3tKJL\u0017\r\\5{K\u0012lUm]:bO\u0016$b!a\u0006\u0002^\u0005\u0005\u0004BBA0#\u0001\u0007\u0011'\u0001\u0004ng\u001e|%M\u001b\u0005\b\u0003G\n\u0002\u0019AA3\u0003-!W\r\\5wKJLH+Y4\u0011\u0007}\t9'C\u0002\u0002j\u0001\u0012A\u0001T8oO\u00061\u0002.\u00198eY\u0016\u001cuN\\:v[\u0016\u0014(+Z:q_:\u001cX-\u0006\u0003\u0002p\u0005=ECBA9\u0003\u0007\u000b)\tE\u0004 \u0003g\n9(a\u0006\n\u0007\u0005U\u0004EA\u0005Gk:\u001cG/[8ocA)\u0011\u0011PA@c5\u0011\u00111\u0010\u0006\u0004\u0003{\u0002\u0013\u0001B;uS2LA!!!\u0002|\t\u0019AK]=\t\u000f\u0005\r$\u00031\u0001\u0002f!9\u0011q\u0011\nA\u0002\u0005%\u0015\u0001C2p[BdW\r^3\u0011\u000b}\tY)a\u0006\n\u0007\u00055\u0005EA\u0005Gk:\u001cG/[8oa\u00111\u0011\u0011\u0013\nC\u00025\u0012\u0011!V\u0001\u0005gR|\u0007\u000f\u0006\u0002\u0002\u0018B1\u0011\u0011CAM\u0003/I1!a'^\u0005\u00191U\u000f^;sK\u0006y2-\u001e:sK:$8i\u001c8tk6Lgn\u001a$viV\u0014Xm]\"p[BdW\r^3\u0016\u0005\u0005]\u0005")
/* loaded from: input_file:rhttpc/transport/amqp/AmqpSubscriber.class */
public abstract class AmqpSubscriber<Sub> implements Subscriber<Sub> {
    private Logger rhttpc$transport$amqp$AmqpSubscriber$$logger;
    public final Channel rhttpc$transport$amqp$AmqpSubscriber$$channel;
    private final String queueName;
    private final ActorRef consumer;
    public final Deserializer<Sub> rhttpc$transport$amqp$AmqpSubscriber$$deserializer;
    private final FiniteDuration consumeTimeout;
    private final FiniteDuration nackDelay;
    private final ActorSystem system;
    private final Agent<Set<Promise<BoxedUnit>>> pendingConsumePromises;
    private volatile Option<String> consumerTag = None$.MODULE$;
    private volatile boolean bitmap$0;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [rhttpc.transport.amqp.AmqpSubscriber] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.rhttpc$transport$amqp$AmqpSubscriber$$logger = LoggerFactory.getLogger(getClass());
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.rhttpc$transport$amqp$AmqpSubscriber$$logger;
    }

    public Logger rhttpc$transport$amqp$AmqpSubscriber$$logger() {
        return !this.bitmap$0 ? logger$lzycompute() : this.rhttpc$transport$amqp$AmqpSubscriber$$logger;
    }

    private Agent<Set<Promise<BoxedUnit>>> pendingConsumePromises() {
        return this.pendingConsumePromises;
    }

    private Option<String> consumerTag() {
        return this.consumerTag;
    }

    private void consumerTag_$eq(Option<String> option) {
        this.consumerTag = option;
    }

    public void start() {
        consumerTag_$eq(new Some(this.rhttpc$transport$amqp$AmqpSubscriber$$channel.basicConsume(this.queueName, false, new DefaultConsumer(this) { // from class: rhttpc.transport.amqp.AmqpSubscriber$$anon$1
            private final /* synthetic */ AmqpSubscriber $outer;

            public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
                long deliveryTag = envelope.getDeliveryTag();
                String str2 = new String(bArr, "UTF-8");
                Success deserialize = this.$outer.rhttpc$transport$amqp$AmqpSubscriber$$deserializer.deserialize(str2);
                if (deserialize instanceof Success) {
                    this.$outer.rhttpc$transport$amqp$AmqpSubscriber$$handleDeserializedMessage(this.$outer.prepareMessage(deserialize.value(), basicProperties), deliveryTag);
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
                if (!(deserialize instanceof Failure)) {
                    throw new MatchError(deserialize);
                }
                this.$outer.rhttpc$transport$amqp$AmqpSubscriber$$logger().error(new StringBuilder(38).append("REJECT: ").append(deliveryTag).append(" because of parse failure of: ").append(str2).toString(), ((Failure) deserialize).exception());
                this.$outer.rhttpc$transport$amqp$AmqpSubscriber$$channel.basicReject(deliveryTag, false);
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(this.rhttpc$transport$amqp$AmqpSubscriber$$channel);
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        })));
    }

    public abstract Object prepareMessage(Sub sub, AMQP.BasicProperties basicProperties);

    public void rhttpc$transport$amqp$AmqpSubscriber$$handleDeserializedMessage(Object obj, long j) {
        Timeout timeout = new Timeout(this.consumeTimeout);
        Promise apply = Promise$.MODULE$.apply();
        pendingConsumePromises().alter(set -> {
            return set.$plus(apply);
        }).flatMap(set2 -> {
            ActorRef ask = akka.pattern.package$.MODULE$.ask(this.consumer);
            return AskableActorRef$.MODULE$.$qmark$extension1(ask, obj, timeout, AskableActorRef$.MODULE$.$qmark$default$3$extension(ask, obj)).map(obj2 -> {
                $anonfun$handleDeserializedMessage$4(obj2);
                return BoxedUnit.UNIT;
            }, this.system.dispatcher());
        }, this.system.dispatcher()).onComplete(handleConsumerResponse(j, () -> {
            this.complete$1(apply);
        }), this.system.dispatcher());
    }

    private <U> Function1<Try<Object>, BoxedUnit> handleConsumerResponse(long j, Function0<BoxedUnit> function0) {
        return r10 -> {
            $anonfun$handleConsumerResponse$1(this, j, function0, r10);
            return BoxedUnit.UNIT;
        };
    }

    public Future<BoxedUnit> stop() {
        Recovered$.MODULE$.recovered("canceling consumer", () -> {
            this.consumerTag().foreach(str -> {
                $anonfun$stop$2(this, str);
                return BoxedUnit.UNIT;
            });
        });
        return Recovered$.MODULE$.recoveredFuture("completing consuming", () -> {
            return this.currentConsumingFuturesComplete();
        }, this.system.dispatcher()).map(boxedUnit -> {
            $anonfun$stop$4(this, boxedUnit);
            return BoxedUnit.UNIT;
        }, this.system.dispatcher());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<BoxedUnit> currentConsumingFuturesComplete() {
        return pendingConsumePromises().future().flatMap(set -> {
            return Future$.MODULE$.sequence((TraversableOnce) set.map(promise -> {
                return promise.future();
            }, Set$.MODULE$.canBuildFrom()), Set$.MODULE$.canBuildFrom(), this.system.dispatcher());
        }, this.system.dispatcher()).map(set2 -> {
            $anonfun$currentConsumingFuturesComplete$3(set2);
            return BoxedUnit.UNIT;
        }, this.system.dispatcher());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void complete$1(Promise promise) {
        pendingConsumePromises().send(set -> {
            promise.success(BoxedUnit.UNIT);
            return set.$minus(promise);
        });
    }

    public static final /* synthetic */ void $anonfun$handleDeserializedMessage$4(Object obj) {
    }

    public static final /* synthetic */ void $anonfun$handleConsumerResponse$1(AmqpSubscriber amqpSubscriber, long j, Function0 function0, Try r11) {
        boolean z = false;
        Failure failure = null;
        if (r11 instanceof Success) {
            amqpSubscriber.rhttpc$transport$amqp$AmqpSubscriber$$logger().debug(new StringBuilder(5).append("ACK: ").append(j).toString());
            amqpSubscriber.rhttpc$transport$amqp$AmqpSubscriber$$channel.basicAck(j, false);
            function0.apply$mcV$sp();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (r11 instanceof Failure) {
            z = true;
            failure = (Failure) r11;
            AskTimeoutException exception = failure.exception();
            if (exception instanceof AskTimeoutException) {
                amqpSubscriber.rhttpc$transport$amqp$AmqpSubscriber$$logger().debug(new StringBuilder(31).append("REJECT: ").append(j).append(" because of ask timeout").toString(), exception);
                amqpSubscriber.rhttpc$transport$amqp$AmqpSubscriber$$channel.basicReject(j, false);
                function0.apply$mcV$sp();
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
        }
        if (z) {
            Throwable exception2 = failure.exception();
            if ((exception2 instanceof Exception) && (exception2 instanceof RejectingMessage)) {
                amqpSubscriber.rhttpc$transport$amqp$AmqpSubscriber$$logger().debug(new StringBuilder(37).append("REJECT: ").append(j).append(" because of rejecting failure").toString(), (Exception) exception2);
                amqpSubscriber.rhttpc$transport$amqp$AmqpSubscriber$$channel.basicReject(j, false);
                function0.apply$mcV$sp();
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                return;
            }
        }
        if (!z) {
            throw new MatchError(r11);
        }
        amqpSubscriber.rhttpc$transport$amqp$AmqpSubscriber$$logger().debug(new StringBuilder(37).append("Will NACK: ").append(j).append(" after ").append(amqpSubscriber.nackDelay).append(" because of failure").toString(), failure.exception());
        amqpSubscriber.system.scheduler().scheduleOnce(amqpSubscriber.nackDelay, () -> {
            amqpSubscriber.rhttpc$transport$amqp$AmqpSubscriber$$logger().debug(new StringBuilder(34).append("NACK: ").append(j).append(" because of previous failure").toString());
            amqpSubscriber.rhttpc$transport$amqp$AmqpSubscriber$$channel.basicNack(j, false, true);
            function0.apply$mcV$sp();
        }, amqpSubscriber.system.dispatcher());
        BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ void $anonfun$stop$2(AmqpSubscriber amqpSubscriber, String str) {
        amqpSubscriber.rhttpc$transport$amqp$AmqpSubscriber$$channel.basicCancel(str);
    }

    public static final /* synthetic */ void $anonfun$stop$4(AmqpSubscriber amqpSubscriber, BoxedUnit boxedUnit) {
        Recovered$.MODULE$.recovered("closing channel", () -> {
            amqpSubscriber.rhttpc$transport$amqp$AmqpSubscriber$$channel.close();
        });
    }

    public static final /* synthetic */ void $anonfun$currentConsumingFuturesComplete$3(Set set) {
    }

    public AmqpSubscriber(Channel channel, String str, ActorRef actorRef, Deserializer<Sub> deserializer, FiniteDuration finiteDuration, FiniteDuration finiteDuration2, ActorSystem actorSystem) {
        this.rhttpc$transport$amqp$AmqpSubscriber$$channel = channel;
        this.queueName = str;
        this.consumer = actorRef;
        this.rhttpc$transport$amqp$AmqpSubscriber$$deserializer = deserializer;
        this.consumeTimeout = finiteDuration;
        this.nackDelay = finiteDuration2;
        this.system = actorSystem;
        this.pendingConsumePromises = Agent$.MODULE$.apply(Predef$.MODULE$.Set().empty(), actorSystem.dispatcher());
    }
}
