/*
 * Decompiled with CFR 0.152.
 */
package solutions.nyla.apacheKafka;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import nyla.solutions.core.util.Config;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaQueue<E>
extends LinkedBlockingQueue<E>
implements Runnable {
    private static final long serialVersionUID = 3824535058258178886L;
    private long kakfaQueueSleepMs = Config.getPropertyLong((String)"kakfaQueueSleepMs", (long)5L);
    private final String topic;
    private int timeoutMs = Config.getPropertyInteger((String)"apacheKafkaTimeout", (int)0);
    private final String bootStrapServersConfig;
    private final String kafkaGroupId;

    KafkaQueue(String topic, String bootStrapServersConfig, String kafkaGroupId) {
        this.topic = topic;
        this.bootStrapServersConfig = bootStrapServersConfig;
        this.kafkaGroupId = kafkaGroupId;
    }

    @Override
    public void run() {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.bootStrapServersConfig);
        props.put("group.id", this.kafkaGroupId);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer consumer = new KafkaConsumer(props);
        Throwable throwable = null;
        try {
            try {
                consumer.subscribe(Collections.singleton(this.topic));
                System.out.println("START looking for messages for topic:" + this.topic);
                while (true) {
                    try {
                        while (true) {
                            ConsumerRecords records = consumer.poll((long)this.timeoutMs);
                            for (ConsumerRecord record : records) {
                                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                                this.add(record.value());
                            }
                            Thread.sleep(this.kakfaQueueSleepMs);
                        }
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                        continue;
                    }
                    break;
                }
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
        }
        catch (Throwable throwable3) {
            if (consumer != null) {
                if (throwable != null) {
                    try {
                        consumer.close();
                    }
                    catch (Throwable throwable4) {
                        throwable.addSuppressed(throwable4);
                    }
                } else {
                    consumer.close();
                }
            }
            throw throwable3;
        }
    }
}

