package com.jayqqaa12.jbase.cache.notify.kafka;

import com.jayqqaa12.jbase.cache.core.CacheConfig;
import com.jayqqaa12.jbase.cache.core.JbaseCache;
import com.jayqqaa12.jbase.cache.notify.Command;
import com.jayqqaa12.jbase.cache.notify.Notify;
import com.jayqqaa12.jbase.cache.serializer.CacheSerializer;
import com.jayqqaa12.jbase.cache.util.UniqueKit;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jayqqaa12/jbase/cache/notify/kafka/KafkaNotify.class */
public class KafkaNotify implements Notify, Runnable {
    private static final Logger log = LoggerFactory.getLogger(KafkaNotify.class);
    private KafkaProducer<String, byte[]> producer;
    private KafkaConsumer<String, byte[]> consumer;
    private String topic;
    private CacheSerializer cacheSerializer;
    private boolean running;
    private ExecutorService executorService;
    private JbaseCache cache;

    @Override // com.jayqqaa12.jbase.cache.notify.Notify
    public void init(CacheConfig cacheConfig, JbaseCache jbaseCache) throws Exception {
        this.topic = cacheConfig.getNotifyTopic();
        this.cacheSerializer = (CacheSerializer) Class.forName(cacheConfig.getCacheSerializerClass()).newInstance();
        this.cache = jbaseCache;
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", cacheConfig.getKafkaConfig().getHost());
        hashMap.put("key.deserializer", StringDeserializer.class);
        hashMap.put("value.deserializer", ByteBufferDeserializer.class);
        hashMap.put("enable.auto.commit", "true");
        hashMap.put("auto.offset.reset", "latest");
        hashMap.put("group.id", cacheConfig.getKafkaConfig().getGroupId() + "-" + UniqueKit.JVM_PID);
        this.consumer = new KafkaConsumer<>(hashMap);
        this.consumer.subscribe(Arrays.asList(this.topic));
        this.producer = new KafkaProducer<>(hashMap);
        this.running = true;
        this.executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1));
        this.executorService.execute(this);
    }

    @Override // com.jayqqaa12.jbase.cache.notify.Notify
    public void stop() {
        this.running = false;
        this.executorService.shutdown();
        this.producer.close();
        this.consumer.close();
    }

    @Override // com.jayqqaa12.jbase.cache.notify.Notify
    public void send(Command command) {
        this.producer.send(new ProducerRecord(this.topic, this.cacheSerializer.serialize(command)), (recordMetadata, exc) -> {
            if (exc != null) {
                log.error("kafka send notify  error  key={}@{}", new Object[]{command.getRegion(), command.getKeys(), exc});
            }
        });
        log.debug("kafka send notify key={}@{}", command.getRegion(), command.getKeys());
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.running) {
            this.consumer.poll(1000L).forEach(consumerRecord -> {
                Command command = (Command) this.cacheSerializer.deserialize((byte[]) consumerRecord.value());
                log.debug("kafka receive notify  command {} ", command);
                this.cache.handlerCommand(command);
            });
        }
    }
}
