package org.kinotic.continuum.internal.core.api.event;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.util.Collections;
import java.util.HashMap;
import java.util.UUID;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.Validate;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.kinotic.continuum.api.config.ContinuumProperties;
import org.kinotic.continuum.core.api.event.CRI;
import org.kinotic.continuum.core.api.event.Event;
import org.kinotic.continuum.core.api.event.EventStreamService;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;

@Component
/* loaded from: input_file:org/kinotic/continuum/internal/core/api/event/DefaultEventStreamService.class */
public class DefaultEventStreamService implements EventStreamService {
    private static final Logger log = LoggerFactory.getLogger(DefaultEventStreamService.class);

    @Autowired
    private Vertx vertx;

    @Autowired
    private KafkaTemplate<String, byte[]> kafkaTemplate;

    @Autowired
    private KafkaSender<String, byte[]> kafkaSender;

    @Autowired
    private ContinuumProperties continuumProperties;
    private Scheduler scheduler;

    @PostConstruct
    public void init() {
        this.scheduler = Schedulers.fromExecutor(runnable -> {
            this.vertx.executeBlocking(promise -> {
                runnable.run();
            }, (Handler) null);
        });
    }

    public Mono<Void> send(Event<byte[]> event) {
        Validate.notNull(event, "Event must not be null", new Object[0]);
        return Mono.create(monoSink -> {
            this.kafkaTemplate.send(new ProducerRecord(event.cri().resourceName(), (Integer) null, event.cri().scope(), (byte[]) event.data(), new MetadataHeadersAdapter(event.metadata()))).addCallback(new ListenableFutureCallback<SendResult<String, byte[]>>() { // from class: org.kinotic.continuum.internal.core.api.event.DefaultEventStreamService.1
                public void onFailure(Throwable th) {
                    monoSink.error(th);
                }

                public void onSuccess(SendResult<String, byte[]> sendResult) {
                    monoSink.success();
                }
            });
        }).subscribeOn(this.scheduler);
    }

    public Mono<Void> sendStream(Publisher<Event<byte[]>> publisher) {
        Validate.notNull(publisher, "Publisher must not be null", new Object[0]);
        return this.kafkaSender.createOutbound().send(Flux.from(publisher).map(event -> {
            return new ProducerRecord(event.cri().resourceName(), (Integer) null, event.cri().scope(), (byte[]) event.data(), new MetadataHeadersAdapter(event.metadata()));
        })).then();
    }

    public Flux<Event<byte[]>> listen(CRI cri) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.continuumProperties.getKafkaBootstrapServers());
        hashMap.put("group.id", UUID.randomUUID().toString());
        hashMap.put("auto.offset.reset", "earliest");
        hashMap.put("key.deserializer", StringDeserializer.class);
        hashMap.put("value.deserializer", ByteArrayDeserializer.class);
        return KafkaReceiver.create(ReceiverOptions.create(hashMap).subscription(Collections.singleton(cri.resourceName()))).receive().map(receiverRecord -> {
            receiverRecord.receiverOffset().acknowledge();
            return new ReceiverRecordEventAdapter(cri, receiverRecord);
        }).subscribeOn(this.scheduler);
    }
}
