package io.continual.services.processor.library.kafka.sources;

import io.continual.services.processor.config.readers.ConfigLoadContext;
import io.continual.services.processor.engine.library.sources.BasicSource;
import io.continual.services.processor.engine.model.Message;
import io.continual.services.processor.engine.model.MessageAndRouting;
import io.continual.services.processor.engine.model.StreamProcessingContext;
import io.continual.util.data.exprEval.ExpressionEvaluator;
import io.continual.util.data.json.CommentedJsonTokener;
import io.continual.util.data.json.JsonVisitor;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.json.JSONTokener;

/* loaded from: input_file:io/continual/services/processor/library/kafka/sources/KafkaSource.class */
public class KafkaSource extends BasicSource {
    private final Properties fProps;
    private final KafkaConsumer<String, String> fConsumer;
    private final LinkedList<MessageAndRouting> fPendingMsgs;

    public KafkaSource(ConfigLoadContext configLoadContext, JSONObject jSONObject) {
        super(jSONObject);
        ExpressionEvaluator exprEval = configLoadContext.getServiceContainer().getExprEval();
        JSONObject evaluateJsonObject = exprEval.evaluateJsonObject(jSONObject);
        String string = evaluateJsonObject.getString("topic");
        String string2 = evaluateJsonObject.getString("group");
        this.fProps = new Properties();
        readConfigInto(jSONObject.optJSONObject("kafka"), this.fProps, exprEval);
        this.fProps.put("group.id", string + "::" + string2);
        this.fProps.put("client.id", UUID.randomUUID().toString());
        this.fProps.put("enable.auto.commit", Boolean.valueOf(evaluateJsonObject.optBoolean("autoCommit", true)));
        this.fProps.put("key.deserializer", StringDeserializer.class);
        this.fProps.put("value.deserializer", StringDeserializer.class);
        this.fConsumer = new KafkaConsumer<>(this.fProps);
        this.fConsumer.subscribe(Arrays.asList(string));
        this.fPendingMsgs = new LinkedList<>();
    }

    public synchronized void close() throws IOException {
        noteEndOfStream();
        this.fConsumer.close();
    }

    protected MessageAndRouting internalGetNextMessage(StreamProcessingContext streamProcessingContext) throws IOException, InterruptedException {
        if (this.fPendingMsgs.size() > 0) {
            return this.fPendingMsgs.remove();
        }
        Iterator it = this.fConsumer.poll(100L).iterator();
        while (it.hasNext()) {
            String str = (String) ((ConsumerRecord) it.next()).value();
            try {
                this.fPendingMsgs.add(makeDefRoutingMessage(Message.adoptJsonAsMessage(new JSONObject((JSONTokener) new CommentedJsonTokener(str)))));
            } catch (JSONException e) {
                streamProcessingContext.warn("Couldn't parse inbound text as JSON: " + str);
            }
        }
        if (this.fPendingMsgs.size() > 0) {
            return this.fPendingMsgs.remove();
        }
        return null;
    }

    private void readConfigInto(JSONObject jSONObject, final Properties properties, final ExpressionEvaluator expressionEvaluator) {
        if (jSONObject == null) {
            return;
        }
        JsonVisitor.forEachElement(jSONObject, new JsonVisitor.ObjectVisitor<Object, JSONException>() { // from class: io.continual.services.processor.library.kafka.sources.KafkaSource.1
            public boolean visit(String str, Object obj) throws JSONException {
                if (obj == null || (obj instanceof JSONObject) || (obj instanceof JSONArray)) {
                    return true;
                }
                properties.put(str, expressionEvaluator.evaluateText(obj.toString()));
                return true;
            }
        });
    }
}
