package tech.guyi.component.message.stream.kafka;

import java.time.Duration;
import java.util.Properties;
import java.util.function.Consumer;
import javax.annotation.Resource;
import lombok.NonNull;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.guyi.component.message.stream.api.stream.MessageStream;
import tech.guyi.component.message.stream.api.stream.entry.Message;
import tech.guyi.component.message.stream.api.worker.MessageStreamWorker;
import tech.guyi.component.message.stream.kafka.configuration.ConfigurationType;
import tech.guyi.component.message.stream.kafka.configuration.KafkaConfiguration;

/* loaded from: input_file:tech/guyi/component/message/stream/kafka/KafkaMessageStream.class */
public class KafkaMessageStream implements MessageStream {
    private static final Logger log = LoggerFactory.getLogger(KafkaMessageStream.class);

    @Resource
    private MessageStreamWorker worker;

    @Resource
    private KafkaConfiguration configuration;
    private KafkaProducer<String, byte[]> producer;
    private KafkaConsumer<String, byte[]> consumer;
    private boolean run;
    private Consumer<Message> receiver;

    @NonNull
    public String getName() {
        return "kafka";
    }

    private void openProducer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.configuration.getBootstrapServers(ConfigurationType.PRODUCER));
        properties.put("acks", this.configuration.getProducer().getAcks());
        properties.put("retries", Integer.valueOf(this.configuration.getProducer().getRetries()));
        properties.put("batch.size", Integer.valueOf(this.configuration.getProducer().getBatchSize()));
        properties.put("key.serializer", this.configuration.getProducer().getKeySerializer());
        properties.put("value.serializer", this.configuration.getProducer().getValueSerializer());
        this.producer = new KafkaProducer<>(properties);
    }

    private void openConsumer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.configuration.getBootstrapServers(ConfigurationType.CONSUMER));
        properties.put("group.id", this.configuration.getConsumer().getGroupId());
        properties.put("enable.auto.commit", Boolean.valueOf(this.configuration.getConsumer().isAutoCommit()));
        properties.put("auto.commit.interval.ms", Integer.valueOf(this.configuration.getConsumer().getInterval()));
        properties.put("session.timeout.ms", Integer.valueOf(this.configuration.getConsumer().getTimeout()));
        properties.put("heartbeat.interval.ms", Integer.valueOf(this.configuration.getConsumer().getHeartbeat()));
        properties.put("max.poll.records", Integer.valueOf(this.configuration.getConsumer().getMaxRecords()));
        properties.put("auto.offset.reset", this.configuration.getConsumer().getReset());
        properties.put("key.deserializer", this.configuration.getConsumer().getKeyDeserializer());
        properties.put("value.deserializer", this.configuration.getConsumer().getValueDeserializer());
        this.consumer = new KafkaConsumer<>(properties);
        this.consumer.subscribe(this.configuration.getConsumer().getTopic());
        this.run = true;
        this.worker.submit(this::pullMessage);
    }

    private void pullMessage() {
        while (this.run) {
            try {
                this.consumer.poll(Duration.ofMillis(this.configuration.getConsumer().getInterval())).forEach(consumerRecord -> {
                    this.receiver.accept(new Message((String) consumerRecord.key(), (byte[]) consumerRecord.value()));
                });
            } catch (Exception e) {
                log.error("Kafka消息拉取异常", e);
            } catch (WakeupException e2) {
            }
        }
        this.consumer.close();
    }

    public void close() {
        this.run = false;
        this.producer.close();
    }

    public void register(String str) {
    }

    public void unregister(String str) {
    }

    public void open(Consumer<Message> consumer) {
        openProducer();
        if (this.configuration.getConsumer().isEnable()) {
            openConsumer();
        }
        this.receiver = consumer;
    }

    public void publish(Message message) {
        this.producer.send(new ProducerRecord(this.configuration.getProducer().getTopic(), message.getTopic(), message.getBytes()));
    }
}
