package org.apache.plc4x.kafka;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.utils.connectionpool2.PooledDriverManager;
import org.apache.plc4x.kafka.config.Constants;
import org.apache.plc4x.kafka.util.VersionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/plc4x/kafka/Plc4xSinkTask.class */
public class Plc4xSinkTask extends SinkTask {
    static final String CONNECTION_NAME_CONFIG = "connection-name";
    private static final String CONNECTION_NAME_STRING_DOC = "Connection Name";
    static final String PLC4X_CONNECTION_STRING_CONFIG = "connectionString";
    private static final String PLC4X_CONNECTION_STRING_DOC = "PLC4X Connection String";
    static final String PLC4X_TOPIC_CONFIG = "topic";
    private static final String PLC4X_RETRIES_CONFIG = "retries";
    private static final String PLC4X_TIMEOUT_CONFIG = "timeout";
    static final String QUERIES_CONFIG = "queries";
    private static final String SINK_NAME_FIELD = "sink-name";
    private static final String SINK_TOPIC_FIELD = "topic";
    private PlcDriverManager driverManager;
    private Transformation<SinkRecord> transformation;
    private String plc4xConnectionString;
    private String plc4xTopic;
    private Integer plc4xRetries;
    private Integer plc4xTimeout;
    private Integer remainingRetries;
    private AbstractConfig config;
    private Map<String, String> fields;
    private static final Logger log = LoggerFactory.getLogger(Plc4xSinkTask.class);
    private static final String PLC4X_TOPIC_DOC = "Task Topic";
    private static final String PLC4X_RETRIES_DOC = "Number of times to retry after failed write";
    private static final String PLC4X_TIMEOUT_DOC = "Time between retries";
    private static final String QUERIES_DOC = "Fields to be sent to the PLC";
    private static final ConfigDef CONFIG_DEF = new ConfigDef().define("connection-name", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Connection Name").define("connectionString", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "PLC4X Connection String").define(Constants.TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, PLC4X_TOPIC_DOC).define("retries", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, PLC4X_RETRIES_DOC).define("timeout", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, PLC4X_TIMEOUT_DOC).define("queries", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERIES_DOC);

    public String version() {
        return VersionUtil.getVersion();
    }

    public void start(Map<String, String> map) {
        this.config = new AbstractConfig(CONFIG_DEF, map);
        this.config.getString("connection-name");
        this.plc4xConnectionString = this.config.getString("connectionString");
        this.plc4xTopic = this.config.getString(Constants.TOPIC_CONFIG);
        this.plc4xRetries = this.config.getInt("retries");
        this.remainingRetries = this.plc4xRetries;
        this.plc4xTimeout = this.config.getInt("timeout");
        String string = this.config.getString("queries");
        this.fields = new HashMap();
        String[] split = string.split("\\|");
        for (int i = 0; i < split.length; i++) {
            String[] split2 = split[i].split("#");
            if (split2.length != 2) {
                log.warn(String.format("Error in field configuration. The field segment expects a format {field-alias}#{field-address}, but got '%s'", split[i]));
            } else {
                this.fields.put(split2[0], split2[1]);
            }
        }
        log.info("Creating Pooled PLC4x driver manager");
        this.driverManager = new PooledDriverManager();
    }

    public void stop() {
        synchronized (this) {
            notifyAll();
        }
    }

    public void put(Collection<SinkRecord> collection) {
        if (collection.isEmpty()) {
            return;
        }
        try {
            PlcConnection connection = this.driverManager.getConnection(this.plc4xConnectionString);
            PlcWriteRequest.Builder writeRequestBuilder = connection.writeRequestBuilder();
            int i = 0;
            for (SinkRecord sinkRecord : collection) {
                Struct struct = (Struct) sinkRecord.value();
                String str = sinkRecord.topic();
                Struct struct2 = struct.getStruct(Constants.FIELDS_CONFIG);
                Iterator it = struct2.schema().fields().iterator();
                while (it.hasNext()) {
                    String name = ((Field) it.next()).name();
                    Object obj = struct2.get(name);
                    if (obj != null) {
                        Long int64 = struct.getInt64(Constants.TIMESTAMP_CONFIG);
                        Long int642 = struct.getInt64(Constants.EXPIRES_CONFIG);
                        Long valueOf = int642 != null ? Long.valueOf(int642.longValue() + int64.longValue()) : 0L;
                        if (!str.equals(this.plc4xTopic) || this.plc4xTopic.equals("")) {
                            log.debug("Ignoring write request received on wrong topic");
                        } else if (!this.fields.containsKey(name)) {
                            log.warn("Unable to find address for field " + name);
                        } else if ((System.currentTimeMillis() > valueOf.longValue()) && (valueOf.longValue() != 0)) {
                            log.warn("Write request has expired {} - {}, discarding {}", new Object[]{valueOf, Long.valueOf(System.currentTimeMillis()), name});
                        } else {
                            String str2 = this.fields.get(name);
                            try {
                                if (obj instanceof String) {
                                    String str3 = (String) obj;
                                    if (str3.charAt(0) == '[' && str3.charAt(str3.length() - 1) == ']') {
                                        writeRequestBuilder.addItem(str2, str2, str3.substring(1, str3.length() - 1).split(","));
                                    } else {
                                        writeRequestBuilder.addItem(str2, str2, new Object[]{obj});
                                    }
                                } else {
                                    writeRequestBuilder.addItem(str2, str2, new Object[]{obj});
                                }
                                i++;
                            } catch (Exception e) {
                                log.warn("Invalid Address format for protocol {}", str2);
                            }
                        }
                    }
                }
            }
            if (i > 0) {
                try {
                    writeRequestBuilder.build().execute().get();
                    log.debug("Wrote records to {}", this.plc4xConnectionString);
                } catch (Exception e2) {
                    Integer num = this.remainingRetries;
                    this.remainingRetries = Integer.valueOf(this.remainingRetries.intValue() - 1);
                    if (this.remainingRetries.intValue() > 0) {
                        if (this.context != null) {
                            this.context.timeout(this.plc4xTimeout.intValue());
                        }
                        try {
                            connection.close();
                        } catch (Exception e3) {
                            log.warn("Failed to Close {} on RetriableException", this.plc4xConnectionString);
                        }
                        throw new RetriableException("Failed to Write to " + this.plc4xConnectionString + " retrying records that haven't expired");
                    }
                    log.warn("Failed to write after {} retries", this.plc4xRetries);
                }
            }
            try {
                connection.close();
            } catch (Exception e4) {
                log.warn("Failed to Close {}", this.plc4xConnectionString);
            }
            this.remainingRetries = this.plc4xRetries;
        } catch (PlcConnectionException e5) {
            log.warn("Failed to Open Connection {}", this.plc4xConnectionString);
            Integer num2 = this.remainingRetries;
            this.remainingRetries = Integer.valueOf(this.remainingRetries.intValue() - 1);
            if (this.remainingRetries.intValue() > 0) {
                if (this.context != null) {
                    this.context.timeout(this.plc4xTimeout.intValue());
                }
                throw new RetriableException("Failed to Write to " + this.plc4xConnectionString + " retrying records that haven't expired");
            }
            log.warn("Failed to write after {} retries", this.plc4xRetries);
        }
    }
}
