package org.smallmind.bayeux.oumuamua.server.spi.backbone.kafka;

import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.smallmind.bayeux.oumuamua.server.api.OumuamuaException;
import org.smallmind.bayeux.oumuamua.server.api.Packet;
import org.smallmind.bayeux.oumuamua.server.api.Server;
import org.smallmind.bayeux.oumuamua.server.api.Session;
import org.smallmind.bayeux.oumuamua.server.api.backbone.Backbone;
import org.smallmind.bayeux.oumuamua.server.api.json.Value;
import org.smallmind.bayeux.oumuamua.server.spi.backbone.DebonedPacket;
import org.smallmind.bayeux.oumuamua.server.spi.backbone.RecordUtility;
import org.smallmind.nutsnbolts.util.ComponentStatus;
import org.smallmind.nutsnbolts.util.SnowflakeId;
import org.smallmind.scribe.pen.LoggerManager;

/* loaded from: input_file:org/smallmind/bayeux/oumuamua/server/spi/backbone/kafka/KafkaBackbone.class */
public class KafkaBackbone<V extends Value<V>> implements Backbone<V> {
    private final KafkaConnector connector;
    private final Producer<Long, byte[]> producer;
    private final String nodeName;
    private final String topicName;
    private final int concurrencyLimit;
    private ConsumerWorker<V>[] workers;
    private final ExecutorService executorService = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadPoolExecutor.CallerRunsPolicy());
    private final AtomicReference<ComponentStatus> statusRef = new AtomicReference<>(ComponentStatus.STOPPED);
    private final String groupId = SnowflakeId.newInstance().generateHexEncoding();

    /* loaded from: input_file:org/smallmind/bayeux/oumuamua/server/spi/backbone/kafka/KafkaBackbone$ConsumerWorker.class */
    private static class ConsumerWorker<V extends Value<V>> implements Runnable {
        private final AtomicBoolean finished = new AtomicBoolean(false);
        private final Server<V> server;
        private final Consumer<Long, byte[]> consumer;
        private final String nodeName;

        public ConsumerWorker(Server<V> server, String str, Consumer<Long, byte[]> consumer) {
            this.server = server;
            this.nodeName = str;
            this.consumer = consumer;
        }

        private void stop() {
            if (this.finished.compareAndSet(false, true)) {
                this.consumer.wakeup();
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.finished.get()) {
                try {
                    try {
                        ConsumerRecords poll = this.consumer.poll(Duration.ofSeconds(3L));
                        if (poll != null && !poll.isEmpty()) {
                            for (TopicPartition topicPartition : poll.partitions()) {
                                long j = 0;
                                List<ConsumerRecord> records = poll.records(topicPartition);
                                for (ConsumerRecord consumerRecord : records) {
                                    try {
                                        DebonedPacket deserialize = RecordUtility.deserialize(this.server.getCodec(), (byte[]) consumerRecord.value());
                                        if (!this.nodeName.equals(deserialize.getNodeName())) {
                                            this.server.deliver((Session) null, deserialize.getPacket(), false);
                                        }
                                    } catch (Exception e) {
                                        LoggerManager.getLogger(KafkaBackbone.class).error(e);
                                    }
                                    j = consumerRecord.offset();
                                }
                                if (!records.isEmpty()) {
                                    this.consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(j + 1)));
                                }
                            }
                        }
                    } catch (Throwable th) {
                        this.consumer.close();
                        throw th;
                    }
                } catch (WakeupException e2) {
                    if (!this.finished.get()) {
                        LoggerManager.getLogger(KafkaBackbone.class).error(e2);
                    }
                    this.consumer.close();
                    return;
                }
            }
            this.consumer.close();
        }
    }

    public KafkaBackbone(String str, int i, String str2, KafkaServer... kafkaServerArr) throws OumuamuaException {
        this.nodeName = str;
        this.concurrencyLimit = i;
        this.topicName = str2;
        this.connector = new KafkaConnector(kafkaServerArr);
        this.producer = this.connector.createProducer(str);
        if (!((Boolean) this.connector.invokeAdminClient(adminClient -> {
            try {
                Collection collection = (Collection) adminClient.describeCluster().nodes().get();
                return Boolean.valueOf((collection == null || collection.isEmpty()) ? false : true);
            } catch (InterruptedException | ExecutionException e) {
                LoggerManager.getLogger(KafkaBackbone.class).error(e);
                return false;
            }
        })).booleanValue()) {
            throw new OumuamuaException("Unable to start the kafka backbone service", new Object[0]);
        }
    }

    public void startUp(Server<V> server) throws Exception {
        if (!this.statusRef.compareAndSet(ComponentStatus.STOPPED, ComponentStatus.STARTING)) {
            while (ComponentStatus.STARTING.equals(this.statusRef.get())) {
                Thread.sleep(100L);
            }
            return;
        }
        this.workers = new ConsumerWorker[this.concurrencyLimit];
        for (int i = 0; i < this.concurrencyLimit; i++) {
            ConsumerWorker<V> consumerWorker = new ConsumerWorker<>(server, this.nodeName, this.connector.createConsumer(this.nodeName + "-" + i, this.groupId, this.topicName));
            this.workers[i] = consumerWorker;
            new Thread(consumerWorker).start();
        }
        this.statusRef.set(ComponentStatus.STARTED);
    }

    public void shutDown() throws InterruptedException {
        if (!this.statusRef.compareAndSet(ComponentStatus.STARTED, ComponentStatus.STOPPING)) {
            while (ComponentStatus.STOPPING.equals(this.statusRef.get())) {
                Thread.sleep(100L);
            }
            return;
        }
        for (ConsumerWorker<V> consumerWorker : this.workers) {
            consumerWorker.stop();
        }
        this.statusRef.set(ComponentStatus.STOPPED);
    }

    public void publish(Packet<V> packet) {
        this.executorService.submit(() -> {
            try {
                this.producer.send(new ProducerRecord(this.topicName, RecordUtility.serialize(this.nodeName, packet)));
            } catch (IOException e) {
                LoggerManager.getLogger(KafkaBackbone.class).error(e);
            }
        });
    }
}
