package io.quarkus.reactivemessaging.http.runtime;

import io.quarkus.reactivemessaging.http.runtime.config.StreamConfigBase;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.groups.MultiCreate;
import io.smallrye.mutiny.groups.MultiOnItem;
import io.smallrye.mutiny.subscription.BackPressureStrategy;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.vertx.ext.web.RoutingContext;
import jakarta.annotation.PostConstruct;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/quarkus/reactivemessaging/http/runtime/ReactiveHandlerBeanBase.class */
public abstract class ReactiveHandlerBeanBase<ConfigType extends StreamConfigBase, MessageType> {
    protected final Map<String, ReactiveHandlerBeanBase<ConfigType, MessageType>.Bundle<MessageType>> processors = new HashMap();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/quarkus/reactivemessaging/http/runtime/ReactiveHandlerBeanBase$Bundle.class */
    public class Bundle<MessageType> {
        private final StrictQueueSizeGuard guard;
        private Multi<MessageType> processor;
        private MultiEmitter<? super MessageType> emitter;
        private String path;

        private Bundle(StrictQueueSizeGuard strictQueueSizeGuard) {
            this.guard = strictQueueSizeGuard;
        }

        public void setProcessor(Multi<MessageType> multi) {
            this.processor = multi;
        }

        public void setEmitter(MultiEmitter<? super MessageType> multiEmitter) {
            this.emitter = multiEmitter;
        }

        public Multi<MessageType> getProcessor() {
            return this.processor;
        }

        public void setPath(String str) {
            this.path = str;
        }
    }

    @PostConstruct
    void init() {
        configs().forEach(this::addProcessor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handle(RoutingContext routingContext) {
        ReactiveHandlerBeanBase<ConfigType, MessageType>.Bundle<MessageType> bundle = this.processors.get(key(routingContext));
        if (bundle != null) {
            handleRequest(routingContext, ((Bundle) bundle).emitter, ((Bundle) bundle).guard, ((Bundle) bundle).path);
        } else {
            routingContext.response().setStatusCode(404).end();
        }
    }

    private void addProcessor(ConfigType configtype) {
        StrictQueueSizeGuard strictQueueSizeGuard = new StrictQueueSizeGuard(configtype.bufferSize);
        ReactiveHandlerBeanBase<ConfigType, MessageType>.Bundle<MessageType> bundle = new Bundle<>(strictQueueSizeGuard);
        MultiCreate createFrom = Multi.createFrom();
        Objects.requireNonNull(bundle);
        MultiOnItem onItem = createFrom.emitter(bundle::setEmitter, BackPressureStrategy.BUFFER).onItem();
        Objects.requireNonNull(strictQueueSizeGuard);
        bundle.setProcessor(onItem.invoke(strictQueueSizeGuard::dequeue));
        bundle.setPath(configtype.path);
        if (this.processors.put(key((ReactiveHandlerBeanBase<ConfigType, MessageType>) configtype), bundle) != null) {
            throw new IllegalStateException("Duplicate incoming streams defined for " + description(configtype));
        }
    }

    protected abstract void handleRequest(RoutingContext routingContext, MultiEmitter<? super MessageType> multiEmitter, StrictQueueSizeGuard strictQueueSizeGuard, String str);

    protected abstract String description(ConfigType configtype);

    protected abstract String key(ConfigType configtype);

    protected abstract String key(RoutingContext routingContext);

    protected abstract Collection<ConfigType> configs();
}
