package io.openk9.ingestion.queue.logic;

import io.openk9.cbor.api.CBORFactory;
import io.openk9.ingestion.api.BundleSender;
import io.openk9.ingestion.logic.api.IngestionLogic;
import java.util.Objects;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

@Component(immediate = true, service = {SendToQueue.class})
/* loaded from: input_file:io/openk9/ingestion/queue/logic/SendToQueue.class */
public class SendToQueue {
    private Disposable _disposable;

    @Reference(target = "(queue=data-ingestion)")
    private BundleSender _bundleSender;

    @Reference
    private CBORFactory _cborFactory;

    @Reference
    private IngestionLogic _ingestionLogicReceiver;

    @Activate
    public void activate() {
        Flux flux = this._ingestionLogicReceiver.flux();
        CBORFactory cBORFactory = this._cborFactory;
        Objects.requireNonNull(cBORFactory);
        Flux map = flux.map((v1) -> {
            return r2.toCBOR(v1);
        });
        BundleSender bundleSender = this._bundleSender;
        Objects.requireNonNull(bundleSender);
        this._disposable = map.transform(bundleSender::send).subscribe();
    }

    @Deactivate
    public void deactivate() {
        this._disposable.dispose();
    }
}
