package com.jaffa.rpc.lib.kafka;

import com.jaffa.rpc.lib.JaffaService;
import com.jaffa.rpc.lib.exception.JaffaRpcExecutionException;
import com.jaffa.rpc.lib.request.RequestUtils;
import com.jaffa.rpc.lib.request.Sender;
import com.jaffa.rpc.lib.zookeeper.Utils;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jaffa/rpc/lib/kafka/KafkaRequestSender.class */
public class KafkaRequestSender extends Sender {
    private static final Logger log = LoggerFactory.getLogger(KafkaRequestSender.class);
    private static final ConcurrentLinkedQueue<KafkaConsumer<String, byte[]>> consumers = new ConcurrentLinkedQueue<>();
    private final KafkaProducer<String, byte[]> producer = new KafkaProducer<>(JaffaService.getProducerProps());

    public static void initSyncKafkaConsumers(int i, CountDownLatch countDownLatch) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", Utils.getRequiredOption("jaffa.rpc.protocol.kafka.bootstrap.servers"));
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("enable.auto.commit", String.valueOf(false));
        properties.put("auto.offset.reset", "earliest");
        if (Boolean.parseBoolean(System.getProperty("jaffa.rpc.protocol.kafka.use.ssl", String.valueOf(false)))) {
            HashMap hashMap = new HashMap();
            hashMap.put("security.protocol", "SSL");
            hashMap.put("ssl.truststore.location", Utils.getRequiredOption("jaffa.rpc.protocol.kafka.ssl.truststore.location"));
            hashMap.put("ssl.truststore.password", Utils.getRequiredOption("jaffa.rpc.protocol.kafka.ssl.truststore.password"));
            hashMap.put("ssl.keystore.location", Utils.getRequiredOption("jaffa.rpc.protocol.kafka.ssl.keystore.location"));
            hashMap.put("ssl.keystore.password", Utils.getRequiredOption("jaffa.rpc.protocol.kafka.ssl.keystore.password"));
            hashMap.put("ssl.key.password", Utils.getRequiredOption("jaffa.rpc.protocol.kafka.ssl.key.password"));
            properties.putAll(hashMap);
        }
        for (int i2 = 0; i2 < i; i2++) {
            properties.put("group.id", UUID.randomUUID().toString());
            consumers.add(new KafkaConsumer<>(properties));
            countDownLatch.countDown();
        }
    }

    public static void shutDownConsumers() {
        consumers.forEach((v0) -> {
            v0.close();
        });
    }

    private byte[] waitForSyncAnswer(String str, long j) {
        final KafkaConsumer<String, byte[]> poll;
        do {
            poll = consumers.poll();
        } while (poll == null);
        String replace = str.replace("-server", "-client");
        final long epochMilli = Instant.ofEpochMilli(j).minus(3L, (TemporalUnit) ChronoUnit.MINUTES).toEpochMilli();
        final long nanoTime = System.nanoTime();
        poll.subscribe(Collections.singletonList(replace), new ConsumerRebalanceListener() { // from class: com.jaffa.rpc.lib.kafka.KafkaRequestSender.1
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                HashMap hashMap = new HashMap();
                long j2 = epochMilli;
                collection.forEach(topicPartition -> {
                });
                for (Map.Entry entry : poll.offsetsForTimes(hashMap).entrySet()) {
                    if (entry.getValue() != null) {
                        poll.seek((TopicPartition) entry.getKey(), ((OffsetAndTimestamp) entry.getValue()).offset());
                    }
                }
                KafkaRequestSender.log.info(">>>>>> Partitions assigned took {} ns", Long.valueOf(System.nanoTime() - nanoTime));
            }
        });
        poll.poll(Duration.ofMillis(0L));
        log.info(">>>>>> Consumer rebalance took {} ns", Long.valueOf(System.nanoTime() - nanoTime));
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            if ((this.timeout == -1 || System.currentTimeMillis() - currentTimeMillis <= this.timeout) && System.currentTimeMillis() - currentTimeMillis <= 3600000) {
                Iterator it = poll.poll(Duration.ofMillis(10L)).iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    if (((String) consumerRecord.key()).equals(this.command.getRqUid())) {
                        try {
                            HashMap hashMap = new HashMap();
                            hashMap.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset()));
                            poll.commitSync(hashMap);
                        } catch (CommitFailedException e) {
                            log.error("Error during commit received answer", e);
                        }
                        consumers.add(poll);
                        return (byte[]) consumerRecord.value();
                    }
                }
            }
        }
        consumers.add(poll);
        return null;
    }

    @Override // com.jaffa.rpc.lib.request.Sender
    public byte[] executeSync(byte[] bArr) {
        long currentTimeMillis = System.currentTimeMillis();
        String topicForService = RequestUtils.getTopicForService(this.command.getServiceClass(), this.moduleId, true);
        try {
            this.producer.send(new ProducerRecord(topicForService, UUID.randomUUID().toString(), bArr)).get();
            this.producer.close();
            byte[] waitForSyncAnswer = waitForSyncAnswer(topicForService, System.currentTimeMillis());
            log.info(">>>>>> Executed sync request {} in {} ms", this.command.getRqUid(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            return waitForSyncAnswer;
        } catch (InterruptedException | ExecutionException e) {
            log.error("Error in sending sync request", e);
            throw new JaffaRpcExecutionException(e);
        }
    }

    @Override // com.jaffa.rpc.lib.request.Sender
    public void executeAsync(byte[] bArr) {
        try {
            this.producer.send(new ProducerRecord(RequestUtils.getTopicForService(this.command.getServiceClass(), this.moduleId, false), UUID.randomUUID().toString(), bArr)).get();
            this.producer.close();
        } catch (InterruptedException | ExecutionException e) {
            log.error("Error while sending async Kafka request", e);
            throw new JaffaRpcExecutionException(e);
        }
    }
}
