package rhttpc.transport.amqpjdbc;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.agent.Agent;
import akka.agent.Agent$;
import rhttpc.transport.Deserializer;
import rhttpc.transport.InboundQueueData;
import rhttpc.transport.OutboundQueueData;
import rhttpc.transport.PubSubTransport;
import rhttpc.transport.Publisher;
import rhttpc.transport.Serializer;
import rhttpc.transport.SerializingPublisher;
import rhttpc.transport.Subscriber;
import rhttpc.transport.amqp.AmqpQueueStats;
import rhttpc.transport.amqp.AmqpQueueStats$;
import rhttpc.transport.amqp.AmqpTransport;
import scala.$less$colon$less$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.collection.IterableOnceOps;
import scala.collection.concurrent.TrieMap;
import scala.collection.concurrent.TrieMap$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: AmqpJdbcTransport.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005\u0005g!B\t\u0013\u0001IA\u0002\u0002C\u0012\u0001\u0005\u0003\u0005\u000b\u0011B\u0013\t\u0011-\u0002!\u0011!Q\u0001\n1B\u0001b\f\u0001\u0003\u0002\u0003\u0006I\u0001\r\u0005\tq\u0001\u0011\t\u0011)A\u0005s!AA\b\u0001B\u0001B\u0003-Q\bC\u0003F\u0001\u0011\u0005a\tC\u0004O\u0001\t\u0007I\u0011B(\t\r\u001d\u0004\u0001\u0015!\u0003Q\u0011\u001dy\u0007A1A\u0005\nADq!!\u0003\u0001A\u0003%\u0011\u000fC\u0004\u0002\f\u0001!\t%!\u0004\t\u000f\u0005U\u0002\u0001\"\u0003\u00028!9\u0011Q\u000b\u0001\u0005B\u0005]\u0003bBAC\u0001\u0011\u0005\u0013q\u0011\u0005\b\u0003;\u0003A\u0011IAP\u0011\u001d\t)\f\u0001C!\u0003o\u0013Q#Q7ra*#'m\u0019+sC:\u001c\bo\u001c:u\u00136\u0004HN\u0003\u0002\u0014)\u0005A\u0011-\\9qU\u0012\u00147M\u0003\u0002\u0016-\u0005IAO]1ogB|'\u000f\u001e\u0006\u0002/\u00051!\u000f\u001b;ua\u000e\u001c2\u0001A\r !\tQR$D\u0001\u001c\u0015\u0005a\u0012!B:dC2\f\u0017B\u0001\u0010\u001c\u0005\u0019\te.\u001f*fMB\u0011\u0001%I\u0007\u0002%%\u0011!E\u0005\u0002\u0012\u00036\f\bO\u00133cGR\u0013\u0018M\\:q_J$\u0018AC;oI\u0016\u0014H._5oO\u000e\u0001\u0001C\u0001\u0014*\u001b\u00059#B\u0001\u0015\u0015\u0003\u0011\tW.\u001d9\n\u0005):#!D!ncB$&/\u00198ta>\u0014H/\u0001\u0003sKB|\u0007C\u0001\u0011.\u0013\tq#CA\u000eTG\",G-\u001e7fI6+7o]1hKN\u0014V\r]8tSR|'/_\u0001\u0017g\u000eDW\rZ;mKJ\u001c\u0005.Z2l\u0013:$XM\u001d<bYB\u0011\u0011GN\u0007\u0002e)\u00111\u0007N\u0001\tIV\u0014\u0018\r^5p]*\u0011QgG\u0001\u000bG>t7-\u001e:sK:$\u0018BA\u001c3\u000591\u0015N\\5uK\u0012+(/\u0019;j_:\fqd]2iK\u0012,H.\u001a:NKN\u001c\u0018mZ3t\r\u0016$8\r\u001b\"bi\u000eD7+\u001b>f!\tQ\"(\u0003\u0002<7\t\u0019\u0011J\u001c;\u0002\u0017\u0005\u001cGo\u001c:TsN$X-\u001c\t\u0003}\rk\u0011a\u0010\u0006\u0003\u0001\u0006\u000bQ!Y2u_JT\u0011AQ\u0001\u0005C.\\\u0017-\u0003\u0002E\u007f\tY\u0011i\u0019;peNK8\u000f^3n\u0003\u0019a\u0014N\\5u}Q)qIS&M\u001bR\u0011\u0001*\u0013\t\u0003A\u0001AQ\u0001\u0010\u0004A\u0004uBQa\t\u0004A\u0002\u0015BQa\u000b\u0004A\u00021BQa\f\u0004A\u0002ABQ\u0001\u000f\u0004A\u0002e\nqb]2iK\u0012,H.\u001a:t\u0007\u0006\u001c\u0007.Z\u000b\u0002!B!\u0011+V,`\u001b\u0005\u0011&BA\u001bT\u0015\t!6$\u0001\u0006d_2dWm\u0019;j_:L!A\u0016*\u0003\u000fQ\u0013\u0018.Z'baB\u0011\u0001,X\u0007\u00023*\u0011!lW\u0001\u0005Y\u0006twMC\u0001]\u0003\u0011Q\u0017M^1\n\u0005yK&AB*ue&tw\r\r\u0002aKB\u0019\u0001%Y2\n\u0005\t\u0014\"!E!ncBTEMY2TG\",G-\u001e7feB\u0011A-\u001a\u0007\u0001\t%1\u0007\"!A\u0001\u0002\u000b\u0005\u0001NA\u0002`IE\n\u0001c]2iK\u0012,H.\u001a:t\u0007\u0006\u001c\u0007.\u001a\u0011\u0012\u0005%d\u0007C\u0001\u000ek\u0013\tY7DA\u0004O_RD\u0017N\\4\u0011\u0005ii\u0017B\u00018\u001c\u0005\r\te._\u0001\u0019aV\u0014G.[:iKJ\fV/Z;f\u001d\u0006lWm]!hK:$X#A9\u0011\u0007I,x/D\u0001t\u0015\t!\u0018)A\u0003bO\u0016tG/\u0003\u0002wg\n)\u0011iZ3oiB!\u0001p`A\u0003\u001d\tIX\u0010\u0005\u0002{75\t1P\u0003\u0002}I\u00051AH]8pizJ!A`\u000e\u0002\rA\u0013X\rZ3g\u0013\u0011\t\t!a\u0001\u0003\u0007M+GO\u0003\u0002\u007f7A\u0019\u00010a\u0002\n\u0007y\u000b\u0019!A\rqk\nd\u0017n\u001d5feF+X-^3OC6,7/Q4f]R\u0004\u0013!\u00039vE2L7\u000f[3s+\u0011\ty!!\b\u0015\t\u0005E\u00111\u0006\u000b\u0005\u0003'\t\t\u0003\u0005\u0004\u0002\u0016\u0005]\u00111D\u0007\u0002)%\u0019\u0011\u0011\u0004\u000b\u0003\u0013A+(\r\\5tQ\u0016\u0014\bc\u00013\u0002\u001e\u00111\u0011qD\u0006C\u0002!\u0014a\u0001U;c\u001bN<\u0007\"CA\u0012\u0017\u0005\u0005\t9AA\u0013\u0003))g/\u001b3f]\u000e,G%\r\t\u0007\u0003+\t9#a\u0007\n\u0007\u0005%BC\u0001\u0006TKJL\u0017\r\\5{KJDq!!\f\f\u0001\u0004\ty#A\u0005rk\u0016,X\rR1uCB!\u0011QCA\u0019\u0013\r\t\u0019\u0004\u0006\u0002\u0012\u001fV$(m\\;oIF+X-^3ECR\f\u0017\u0001H:dQ\u0016$W\u000f\\3s\u0005f\fV/Z;f\u0003:$\u0007+\u001e2mSNDWM]\u000b\u0005\u0003s\t\t\u0005\u0006\u0004\u0002<\u0005%\u0013Q\n\u000b\u0005\u0003{\t\u0019\u0005\u0005\u0003!C\u0006}\u0002c\u00013\u0002B\u00111\u0011q\u0004\u0007C\u0002!D\u0011\"!\u0012\r\u0003\u0003\u0005\u001d!a\u0012\u0002\u0015\u00154\u0018\u000eZ3oG\u0016$#\u0007\u0005\u0004\u0002\u0016\u0005\u001d\u0012q\b\u0005\b\u0003\u0017b\u0001\u0019AA\u0003\u0003%\tX/Z;f\u001d\u0006lW\rC\u0004\u0002\f1\u0001\r!a\u0014\u0011\r\u0005U\u0011\u0011KA \u0013\r\t\u0019\u0006\u0006\u0002\u0015'\u0016\u0014\u0018.\u00197ju&tw\rU;cY&\u001c\b.\u001a:\u0002\u0015M,(m]2sS\n,'/\u0006\u0003\u0002Z\u0005\u0015DCBA.\u0003g\nY\b\u0006\u0003\u0002^\u0005%\u0004CBA\u000b\u0003?\n\u0019'C\u0002\u0002bQ\u0011!bU;cg\u000e\u0014\u0018NY3s!\r!\u0017Q\r\u0003\u0007\u0003Oj!\u0019\u00015\u0003\rM+(-T:h\u0011%\tY'DA\u0001\u0002\b\ti'\u0001\u0006fm&$WM\\2fIM\u0002b!!\u0006\u0002p\u0005\r\u0014bAA9)\taA)Z:fe&\fG.\u001b>fe\"9\u0011QF\u0007A\u0002\u0005U\u0004\u0003BA\u000b\u0003oJ1!!\u001f\u0015\u0005AIeNY8v]\u0012\fV/Z;f\t\u0006$\u0018\rC\u0004\u0002~5\u0001\r!a \u0002\u0011\r|gn];nKJ\u00042APAA\u0013\r\t\u0019i\u0010\u0002\t\u0003\u000e$xN\u001d*fM\u0006)b-\u001e7m\u001b\u0016\u001c8/Y4f'V\u00147o\u0019:jE\u0016\u0014X\u0003BAE\u0003##b!a#\u0002\u001a\u0006mE\u0003BAG\u0003'\u0003b!!\u0006\u0002`\u0005=\u0005c\u00013\u0002\u0012\u00121\u0011q\r\bC\u0002!D\u0011\"!&\u000f\u0003\u0003\u0005\u001d!a&\u0002\u0015\u00154\u0018\u000eZ3oG\u0016$C\u0007\u0005\u0004\u0002\u0016\u0005=\u0014q\u0012\u0005\b\u0003[q\u0001\u0019AA;\u0011\u001d\tiH\u0004a\u0001\u0003\u007f\n1\"];fk\u0016\u001c8\u000b^1ugV\u0011\u0011\u0011\u0015\t\u0007\u0003G\u000b)+!+\u000e\u0003QJ1!a*5\u0005\u00191U\u000f^;sKB9\u00010a+\u0002\u0006\u0005=\u0016\u0002BAW\u0003\u0007\u00111!T1q!\r\u0001\u0013\u0011W\u0005\u0004\u0003g\u0013\"AE!ncBTEMY2Rk\u0016,Xm\u0015;biN\fAa\u001d;paR\u0011\u0011\u0011\u0018\t\u0007\u0003G\u000b)+a/\u0011\u0007i\ti,C\u0002\u0002@n\u0011A!\u00168ji\u0002")
/* loaded from: input_file:rhttpc/transport/amqpjdbc/AmqpJdbcTransportImpl.class */
public class AmqpJdbcTransportImpl implements AmqpJdbcTransport {
    private final AmqpTransport underlying;
    private final ScheduledMessagesRepository repo;
    private final FiniteDuration schedulerCheckInterval;
    private final int schedulerMessagesFetchBatchSize;
    private final ActorSystem actorSystem;
    private final TrieMap<String, AmqpJdbcScheduler<?>> schedulersCache;
    private final Agent<Set<String>> publisherQueueNamesAgent;

    public <PubMsg> Publisher<PubMsg> publisher(String str, Serializer<PubMsg> serializer) {
        return PubSubTransport.publisher$(this, str, serializer);
    }

    public <SubMsg> Subscriber<SubMsg> subscriber(String str, ActorRef actorRef, Deserializer<SubMsg> deserializer) {
        return PubSubTransport.subscriber$(this, str, actorRef, deserializer);
    }

    private TrieMap<String, AmqpJdbcScheduler<?>> schedulersCache() {
        return this.schedulersCache;
    }

    private Agent<Set<String>> publisherQueueNamesAgent() {
        return this.publisherQueueNamesAgent;
    }

    public <PubMsg> Publisher<PubMsg> publisher(OutboundQueueData outboundQueueData, Serializer<PubMsg> serializer) {
        SerializingPublisher<PubMsg> serializingPublisher = (SerializingPublisher) this.underlying.publisher(outboundQueueData, serializer);
        publisherQueueNamesAgent().send(set -> {
            return set.$plus(outboundQueueData.name());
        });
        if (!outboundQueueData.delayed()) {
            return serializingPublisher;
        }
        return new AmqpJdbcPublisher(serializingPublisher, outboundQueueData.name(), schedulerByQueueAndPublisher(outboundQueueData.name(), serializingPublisher, serializer), () -> {
            return this.removeFromCache$1(outboundQueueData);
        }, this.actorSystem.dispatcher());
    }

    private <PubMsg> AmqpJdbcScheduler<PubMsg> schedulerByQueueAndPublisher(String str, SerializingPublisher<PubMsg> serializingPublisher, Serializer<PubMsg> serializer) {
        return (AmqpJdbcScheduler) schedulersCache().getOrElseUpdate(str, () -> {
            return this.createScheduler$1(str, serializingPublisher, serializer);
        });
    }

    public <SubMsg> Subscriber<SubMsg> subscriber(InboundQueueData inboundQueueData, ActorRef actorRef, Deserializer<SubMsg> deserializer) {
        return this.underlying.subscriber(inboundQueueData, actorRef, deserializer);
    }

    public <SubMsg> Subscriber<SubMsg> fullMessageSubscriber(InboundQueueData inboundQueueData, ActorRef actorRef, Deserializer<SubMsg> deserializer) {
        return this.underlying.fullMessageSubscriber(inboundQueueData, actorRef, deserializer);
    }

    @Override // rhttpc.transport.amqpjdbc.AmqpJdbcTransport
    public Future<Map<String, AmqpJdbcQueueStats>> queuesStats() {
        return this.underlying.queuesStats().flatMap(map -> {
            return this.publisherQueueNamesAgent().future().flatMap(set -> {
                return this.repo.queuesStats(set).map(map -> {
                    return ((IterableOnceOps) map.keySet().$plus$plus(map.keySet()).map(str -> {
                        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str), new AmqpJdbcQueueStats((AmqpQueueStats) map.getOrElse(str, () -> {
                            return AmqpQueueStats$.MODULE$.zero();
                        }), BoxesRunTime.unboxToInt(map.getOrElse(str, () -> {
                            return 0;
                        }))));
                    })).toMap($less$colon$less$.MODULE$.refl());
                }, this.actorSystem.dispatcher());
            }, this.actorSystem.dispatcher());
        }, this.actorSystem.dispatcher());
    }

    public Future<BoxedUnit> stop() {
        return this.underlying.stop();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Future removeFromCache$1(OutboundQueueData outboundQueueData) {
        schedulersCache().remove(outboundQueueData.name());
        return Future$.MODULE$.unit();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final AmqpJdbcSchedulerImpl createScheduler$1(String str, SerializingPublisher serializingPublisher, Serializer serializer) {
        return new AmqpJdbcSchedulerImpl(this.actorSystem.scheduler(), this.schedulerCheckInterval, this.repo, str, this.schedulerMessagesFetchBatchSize, serializingPublisher, this.actorSystem.dispatcher(), serializer);
    }

    public AmqpJdbcTransportImpl(AmqpTransport amqpTransport, ScheduledMessagesRepository scheduledMessagesRepository, FiniteDuration finiteDuration, int i, ActorSystem actorSystem) {
        this.underlying = amqpTransport;
        this.repo = scheduledMessagesRepository;
        this.schedulerCheckInterval = finiteDuration;
        this.schedulerMessagesFetchBatchSize = i;
        this.actorSystem = actorSystem;
        PubSubTransport.$init$(this);
        this.schedulersCache = (TrieMap) TrieMap$.MODULE$.apply(Nil$.MODULE$);
        this.publisherQueueNamesAgent = Agent$.MODULE$.apply(Predef$.MODULE$.Set().empty(), actorSystem.dispatcher());
    }
}
