package org.apache.plc4x.kafka;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.plc4x.java.scraper.config.triggeredscraper.JobConfigurationTriggeredImplBuilder;
import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImplBuilder;
import org.apache.plc4x.java.scraper.exception.ScraperException;
import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl;
import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
import org.apache.plc4x.kafka.config.Constants;
import org.apache.plc4x.kafka.util.VersionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/plc4x/kafka/Plc4xSourceTask.class */
public class Plc4xSourceTask extends SourceTask {
    private static final Logger log = LoggerFactory.getLogger(Plc4xSourceTask.class);
    private static final ConfigDef CONFIG_DEF = new ConfigDef().define(Constants.CONNECTION_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, Constants.CONNECTION_NAME_STRING_DOC).define("connectionString", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "PLC4X Connection String").define("pollReturnInterval", ConfigDef.Type.INT, Constants.KAFKA_POLL_RETURN_DEFAULT, ConfigDef.Importance.HIGH, "Default poll return interval to be used, if not otherwise configured.").define("bufferSize", ConfigDef.Type.INT, Constants.BUFFER_SIZE_DEFAULT, ConfigDef.Importance.HIGH, "Default buffer size to be used, if not otherwise configured.").define(Constants.QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, Constants.QUERIES_DOC);
    private static final Schema KEY_SCHEMA = new SchemaBuilder(Schema.Type.STRUCT).field(Constants.SOURCE_NAME_FIELD, Schema.STRING_SCHEMA).field(Constants.JOB_NAME_FIELD, Schema.STRING_SCHEMA).build();
    private ArrayBlockingQueue<SourceRecord> buffer;
    private Integer pollReturnInterval;
    private TriggeredScraperImpl scraper;

    public String version() {
        return VersionUtil.getVersion();
    }

    public void start(Map<String, String> map) {
        AbstractConfig abstractConfig = new AbstractConfig(CONFIG_DEF, map);
        String string = abstractConfig.getString(Constants.CONNECTION_NAME_CONFIG);
        String string2 = abstractConfig.getString("connectionString");
        this.pollReturnInterval = abstractConfig.getInt("pollReturnInterval");
        Integer num = abstractConfig.getInt("bufferSize");
        HashMap hashMap = new HashMap();
        this.buffer = new ArrayBlockingQueue<>(num.intValue(), true);
        ScraperConfigurationTriggeredImplBuilder scraperConfigurationTriggeredImplBuilder = new ScraperConfigurationTriggeredImplBuilder();
        scraperConfigurationTriggeredImplBuilder.addSource(string, string2);
        for (String str : abstractConfig.getList(Constants.QUERIES_CONFIG)) {
            String[] split = str.split("\\|");
            if (split.length < 4) {
                log.warn(String.format("Error in job configuration '%s'. The configuration expects at least 4 segments: {job-name}|{topic}|{rate}(|{field-alias}#{field-address})+", str));
            } else {
                String str2 = split[0];
                String str3 = split[1];
                JobConfigurationTriggeredImplBuilder source = scraperConfigurationTriggeredImplBuilder.job(str2, String.format("(SCHEDULED,%s)", Integer.valueOf(split[2]))).source(string);
                for (int i = 3; i < split.length; i++) {
                    String[] split2 = split[i].split("#");
                    if (split2.length != 2) {
                        log.warn(String.format("Error in job configuration '%s'. The field segment expects a format {field-alias}#{field-address}, but got '%s'", str2, split[i]));
                    } else {
                        source.field(split2[0], split2[1]);
                        hashMap.put(str2, str3);
                    }
                }
                source.build();
            }
        }
        ScraperConfigurationTriggeredImpl build = scraperConfigurationTriggeredImplBuilder.build();
        try {
            TriggerCollectorImpl triggerCollectorImpl = new TriggerCollectorImpl(new PooledPlcDriverManager());
            this.scraper = new TriggeredScraperImpl(build, (str4, str5, map2) -> {
                Long valueOf = Long.valueOf(System.currentTimeMillis());
                HashMap hashMap2 = new HashMap();
                hashMap2.put(Constants.SOURCE_NAME_FIELD, str5);
                hashMap2.put(Constants.JOB_NAME_FIELD, str4);
                Map singletonMap = Collections.singletonMap("offset", valueOf);
                String str4 = (String) hashMap.get(str4);
                Struct put = new Struct(KEY_SCHEMA).put(Constants.SOURCE_NAME_FIELD, str5).put(Constants.JOB_NAME_FIELD, str4);
                SchemaBuilder name = SchemaBuilder.struct().name("org.apache.plc4x.kafka.schema.Field");
                for (Map.Entry entry : map2.entrySet()) {
                    name.field((String) entry.getKey(), getSchema(entry.getValue()));
                }
                Schema build2 = name.build();
                Schema build3 = SchemaBuilder.struct().name("org.apache.plc4x.kafka.schema.JobResult").doc("PLC Job result. This contains all of the received PLCValues as well as a recieved timestamp").field(Constants.FIELDS_CONFIG, build2).field(Constants.TIMESTAMP_CONFIG, Schema.INT64_SCHEMA).field(Constants.EXPIRES_CONFIG, Schema.OPTIONAL_INT64_SCHEMA).build();
                Struct struct = new Struct(build2);
                for (Map.Entry entry2 : map2.entrySet()) {
                    struct.put((String) entry2.getKey(), entry2.getValue());
                }
                this.buffer.add(new SourceRecord(hashMap2, singletonMap, str4, KEY_SCHEMA, put, build3, new Struct(build3).put(Constants.FIELDS_CONFIG, struct).put(Constants.TIMESTAMP_CONFIG, valueOf)));
            }, triggerCollectorImpl);
            this.scraper.start();
            triggerCollectorImpl.start();
        } catch (ScraperException e) {
            log.error("Error starting the scraper", e);
        }
    }

    public void stop() {
        synchronized (this) {
            this.scraper.stop();
            notifyAll();
        }
    }

    public List<SourceRecord> poll() {
        if (!this.buffer.isEmpty()) {
            int size = this.buffer.size();
            ArrayList arrayList = new ArrayList(size);
            this.buffer.drainTo(arrayList, size);
            return arrayList;
        }
        try {
            ArrayList arrayList2 = new ArrayList(1);
            SourceRecord poll = this.buffer.poll(this.pollReturnInterval.intValue() + RandomUtils.nextInt(0, (int) Math.round(this.pollReturnInterval.intValue() * 0.05d)), TimeUnit.MILLISECONDS);
            if (poll == null) {
                return null;
            }
            arrayList2.add(poll);
            return arrayList2;
        } catch (InterruptedException e) {
            return null;
        }
    }

    private Schema getSchema(Object obj) {
        Objects.requireNonNull(obj);
        if (obj instanceof List) {
            List list = (List) obj;
            if (list.isEmpty()) {
                throw new ConnectException("Unsupported empty lists.");
            }
            return SchemaBuilder.array(getSchema(list.get(0))).build();
        }
        if (obj instanceof BigDecimal) {
        }
        if (obj instanceof Boolean) {
            return Schema.OPTIONAL_BOOLEAN_SCHEMA;
        }
        if (obj instanceof byte[]) {
            return Schema.OPTIONAL_BYTES_SCHEMA;
        }
        if (obj instanceof Byte) {
            return Schema.OPTIONAL_INT8_SCHEMA;
        }
        if (obj instanceof Double) {
            return Schema.OPTIONAL_FLOAT64_SCHEMA;
        }
        if (obj instanceof Float) {
            return Schema.OPTIONAL_FLOAT32_SCHEMA;
        }
        if (obj instanceof Integer) {
            return Schema.OPTIONAL_INT32_SCHEMA;
        }
        if (obj instanceof LocalDate) {
            return Date.builder().optional().build();
        }
        if (obj instanceof LocalDateTime) {
            return Timestamp.builder().optional().build();
        }
        if (obj instanceof LocalTime) {
            return Time.builder().optional().build();
        }
        if (obj instanceof Long) {
            return Schema.OPTIONAL_INT64_SCHEMA;
        }
        if (obj instanceof Short) {
            return Schema.OPTIONAL_INT16_SCHEMA;
        }
        if (obj instanceof String) {
            return Schema.OPTIONAL_STRING_SCHEMA;
        }
        throw new ConnectException(String.format("Unsupported data type %s", obj.getClass().getName()));
    }
}
