package org.apache.iotdb.flink.sql.function;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.curator5.com.google.common.cache.Cache;
import org.apache.flink.shaded.curator5.com.google.common.cache.CacheBuilder;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.iotdb.flink.sql.common.Options;
import org.apache.iotdb.flink.sql.common.Utils;
import org.apache.iotdb.flink.sql.exception.IllegalSchemaException;
import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;

/* loaded from: input_file:org/apache/iotdb/flink/sql/function/IoTDBLookupFunction.class */
public class IoTDBLookupFunction extends TableFunction<RowData> {
    private final List<Tuple2<String, DataType>> schema;
    private final int cacheMaxRows;
    private final int cacheTTLSec;
    private final List<String> nodeUrls;
    private final String user;
    private final String password;
    private final String sql;
    private Session session;
    private transient Cache<RowData, RowData> cache;

    public IoTDBLookupFunction(ReadableConfig readableConfig, SchemaWrapper schemaWrapper) {
        this.schema = schemaWrapper.getSchema();
        this.sql = (String) readableConfig.get(Options.SQL);
        this.cacheMaxRows = ((Integer) readableConfig.get(Options.LOOKUP_CACHE_MAX_ROWS)).intValue();
        this.cacheTTLSec = ((Integer) readableConfig.get(Options.LOOKUP_CACHE_TTL_SEC)).intValue();
        this.nodeUrls = Arrays.asList(((String) readableConfig.get(Options.NODE_URLS)).split(","));
        this.user = (String) readableConfig.get(Options.USER);
        this.password = (String) readableConfig.get(Options.PASSWORD);
    }

    public void open(FunctionContext functionContext) throws Exception {
        super.open(functionContext);
        this.session = new Session.Builder().nodeUrls(this.nodeUrls).username(this.user).password(this.password).build();
        this.session.open(false);
        if (this.cacheMaxRows <= 0 || this.cacheTTLSec <= 0) {
            return;
        }
        this.cache = CacheBuilder.newBuilder().expireAfterAccess(this.cacheTTLSec, TimeUnit.SECONDS).maximumSize(this.cacheMaxRows).build();
    }

    public void close() throws Exception {
        if (this.cache != null) {
            this.cache.invalidateAll();
        }
        if (this.session != null) {
            this.session.close();
        }
        super.close();
    }

    public void eval(Object obj) throws IoTDBConnectionException, StatementExecutionException {
        RowData rowData;
        GenericRowData of = GenericRowData.of(new Object[]{obj});
        if (this.cache != null && (rowData = (RowData) this.cache.getIfPresent(of)) != null) {
            collect(rowData);
            return;
        }
        long j = of.getLong(0);
        SessionDataSet executeQueryStatement = this.session.executeQueryStatement(String.format("%s WHERE TIME=%d", this.sql, Long.valueOf(j)));
        List columnNames = executeQueryStatement.getColumnNames();
        columnNames.remove("Time");
        RowRecord next = executeQueryStatement.next();
        if (next == null) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(Long.valueOf(j));
            for (int i = 0; i < this.schema.size(); i++) {
                arrayList.add(null);
            }
            collect(GenericRowData.of(arrayList.toArray()));
            return;
        }
        List fields = next.getFields();
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(Long.valueOf(j));
        for (Tuple2<String, DataType> tuple2 : this.schema) {
            if (columnNames.contains(tuple2.f0)) {
                int indexOf = columnNames.indexOf(tuple2.f0);
                if (!Utils.isTypeEqual(((Field) fields.get(indexOf)).getDataType(), (DataType) tuple2.f1)) {
                    throw new IllegalSchemaException(String.format("The data type of column `%s` is different in IoTDB and Flink", tuple2.f0));
                }
                arrayList2.add(Utils.getValue((Field) fields.get(indexOf), (DataType) tuple2.f1));
            } else {
                arrayList2.add(null);
            }
        }
        GenericRowData of2 = GenericRowData.of(arrayList2.toArray());
        if (this.cache != null) {
            this.cache.put(of, of2);
        }
        collect(of2);
    }
}
