package org.streampipes.connect.adapter.generic.protocol.stream;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streampipes.commons.exceptions.SpRuntimeException;
import org.streampipes.connect.SendToPipeline;
import org.streampipes.connect.adapter.generic.format.Format;
import org.streampipes.connect.adapter.generic.format.Parser;
import org.streampipes.connect.adapter.generic.format.json.object.JsonObjectFormat;
import org.streampipes.connect.adapter.generic.format.json.object.JsonObjectParser;
import org.streampipes.connect.adapter.generic.pipeline.AdapterPipeline;
import org.streampipes.connect.adapter.generic.protocol.Protocol;
import org.streampipes.connect.adapter.generic.sdk.ParameterExtractor;
import org.streampipes.messaging.InternalEventProcessor;
import org.streampipes.messaging.kafka.SpKafkaConsumer;
import org.streampipes.model.connect.grounding.ProtocolDescription;
import org.streampipes.model.staticproperty.FreeTextStaticProperty;

/* loaded from: input_file:org/streampipes/connect/adapter/generic/protocol/stream/KafkaProtocol.class */
public class KafkaProtocol extends BrokerProtocol {
    Logger logger;
    public static final String ID = "https://streampipes.org/vocabulary/v1/protocol/stream/kafka";
    private Thread thread;
    private SpKafkaConsumer kafkaConsumer;

    /* loaded from: input_file:org/streampipes/connect/adapter/generic/protocol/stream/KafkaProtocol$EventProcessor.class */
    private class EventProcessor implements InternalEventProcessor<byte[]> {
        private SendToPipeline stk;

        public EventProcessor(SendToPipeline sendToPipeline) {
            this.stk = sendToPipeline;
        }

        @Override // org.streampipes.messaging.InternalEventProcessor
        public void onEvent(byte[] bArr) {
            try {
                KafkaProtocol.this.parser.parse(IOUtils.toInputStream(new String(bArr), "UTF-8"), this.stk);
            } catch (IOException e) {
                KafkaProtocol.this.logger.error("Adapter https://streampipes.org/vocabulary/v1/protocol/stream/kafka could not read value!", (Throwable) e);
            }
        }
    }

    public KafkaProtocol() {
        this.logger = LoggerFactory.getLogger((Class<?>) KafkaProtocol.class);
    }

    public KafkaProtocol(Parser parser, Format format, String str, String str2) {
        super(parser, format, str, str2);
        this.logger = LoggerFactory.getLogger((Class<?>) KafkaProtocol.class);
    }

    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public Protocol getInstance(ProtocolDescription protocolDescription, Parser parser, Format format) {
        ParameterExtractor parameterExtractor = new ParameterExtractor(protocolDescription.getConfig());
        return new KafkaProtocol(parser, format, parameterExtractor.singleValue("broker_url"), parameterExtractor.singleValue(ConsumerProtocol.TOPIC_KEY_NAME));
    }

    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public ProtocolDescription declareModel() {
        ProtocolDescription protocolDescription = new ProtocolDescription(ID, "Apache Kafka", "This is the description for the Apache Kafka protocol");
        FreeTextStaticProperty freeTextStaticProperty = new FreeTextStaticProperty("broker_url", "Broker URL", "This property defines the URL of the Kafka broker.");
        protocolDescription.setSourceType("STREAM");
        protocolDescription.setIconUrl("kafka.jpg");
        FreeTextStaticProperty freeTextStaticProperty2 = new FreeTextStaticProperty(ConsumerProtocol.TOPIC_KEY_NAME, "Topic", "Topic in the broker");
        protocolDescription.addConfig(freeTextStaticProperty);
        protocolDescription.addConfig(freeTextStaticProperty2);
        return protocolDescription;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v12, types: [java.util.List] */
    @Override // org.streampipes.connect.adapter.generic.protocol.stream.BrokerProtocol
    protected List<byte[]> getNByteElements(int i) {
        ArrayList subList;
        final Consumer<Long, String> createConsumer = createConsumer(this.brokerUrl, this.topic);
        createConsumer.subscribe(Arrays.asList(this.topic), new ConsumerRebalanceListener() { // from class: org.streampipes.connect.adapter.generic.protocol.stream.KafkaProtocol.1
            @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }

            @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                createConsumer.seekToBeginning(collection);
            }
        });
        ArrayList arrayList = new ArrayList();
        new ArrayList();
        while (true) {
            createConsumer.poll(1000L).forEach(consumerRecord -> {
                try {
                    arrayList.addAll(this.parser.parseNEvents(IOUtils.toInputStream((String) consumerRecord.value(), "UTF-8"), i));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            if (arrayList.size() > i) {
                subList = arrayList.subList(0, i);
                break;
            }
            if (arrayList.size() == i) {
                subList = arrayList;
                break;
            }
            createConsumer.commitAsync();
        }
        createConsumer.close();
        return subList;
    }

    public static void main(String... strArr) {
        System.out.println(new KafkaProtocol(new JsonObjectParser(), new JsonObjectFormat(), "localhost:9092", "org.streampipes.examples.flowrate-1").getGuessSchema());
    }

    private static Consumer<Long, String> createConsumer(String str, String str2) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer" + System.currentTimeMillis());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Collections.singletonList(str2));
        return kafkaConsumer;
    }

    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public void run(AdapterPipeline adapterPipeline) {
        this.kafkaConsumer = new SpKafkaConsumer(this.brokerUrl, this.topic, new EventProcessor(new SendToPipeline(this.format, adapterPipeline)));
        this.thread = new Thread(this.kafkaConsumer);
        this.thread.start();
    }

    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public void stop() {
        try {
            this.kafkaConsumer.disconnect();
        } catch (SpRuntimeException e) {
            e.printStackTrace();
        }
        try {
            Thread.sleep(5000L);
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        this.logger.info("Kafka Adapter was sucessfully stopped");
        this.thread.interrupt();
    }

    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public String getId() {
        return ID;
    }
}
