package com.jaffa.rpc.lib.kafka.receivers;

import com.jaffa.rpc.lib.JaffaService;
import com.jaffa.rpc.lib.common.OptionConstants;
import com.jaffa.rpc.lib.common.RequestInvocationHelper;
import com.jaffa.rpc.lib.entities.Command;
import com.jaffa.rpc.lib.serialization.Serializer;
import com.jaffa.rpc.lib.zookeeper.Utils;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jaffa/rpc/lib/kafka/receivers/KafkaSyncRequestReceiver.class */
public class KafkaSyncRequestReceiver extends KafkaReceiver implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(KafkaSyncRequestReceiver.class);
    private final CountDownLatch countDownLatch;

    public KafkaSyncRequestReceiver(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override // java.lang.Runnable
    public void run() {
        JaffaService.getConsumerProps().put("group.id", UUID.randomUUID().toString());
        startThreadsAndWait(() -> {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(JaffaService.getConsumerProps());
            KafkaProducer kafkaProducer = new KafkaProducer(JaffaService.getProducerProps());
            kafkaConsumer.subscribe(JaffaService.getServerSyncTopics(), new RebalancedListener(kafkaConsumer, this.countDownLatch));
            kafkaConsumer.poll(Duration.ofMillis(0L));
            while (!Thread.currentThread().isInterrupted()) {
                ConsumerRecords consumerRecords = new ConsumerRecords(new HashMap());
                try {
                    consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100L));
                } catch (InterruptException e) {
                }
                Iterator it = consumerRecords.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    try {
                        Command command = (Command) Serializer.getCurrent().deserialize((byte[]) consumerRecord.value(), Command.class);
                        kafkaProducer.send(new ProducerRecord(Utils.getServiceInterfaceNameFromClient(command.getServiceClass()) + "-" + Utils.getRequiredOption(OptionConstants.MODULE_ID) + "-client-sync", command.getRqUid(), Serializer.getCurrent().serializeWithClass(RequestInvocationHelper.getResult(RequestInvocationHelper.invoke(command))))).get();
                        HashMap hashMap = new HashMap();
                        hashMap.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset()));
                        kafkaConsumer.commitSync(hashMap);
                    } catch (Exception e2) {
                        log.error("Target method execution exception", e2);
                    }
                }
            }
            try {
                kafkaConsumer.close();
                kafkaProducer.close();
            } catch (InterruptException e3) {
            }
        });
    }
}
