package org.apache.iotdb.flink;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.iotdb.flink.IoTDBOptions;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/flink/IoTDBSink.class */
public class IoTDBSink<IN> extends RichSinkFunction<IN> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(IoTDBSink.class);
    private IoTDBOptions options;
    private IoTSerializationSchema<IN> serializationSchema;
    private transient SessionPool pool;
    private transient ScheduledExecutorService scheduledExecutor;
    private int batchSize = 0;
    private int flushIntervalMs = 3000;
    private int sessionPoolSize = 2;
    private List<Event> batchList = new LinkedList();
    private Map<String, IoTDBOptions.TimeseriesOption> timeseriesOptionMap = new HashMap();

    public IoTDBSink(IoTDBOptions ioTDBOptions, IoTSerializationSchema<IN> ioTSerializationSchema) {
        this.options = ioTDBOptions;
        this.serializationSchema = ioTSerializationSchema;
        for (IoTDBOptions.TimeseriesOption timeseriesOption : ioTDBOptions.getTimeseriesOptionList()) {
            this.timeseriesOptionMap.put(timeseriesOption.getPath(), timeseriesOption);
        }
    }

    public void open(Configuration configuration) throws Exception {
        initSession();
        initScheduler();
    }

    void initSession() throws Exception {
        this.pool = new SessionPool(this.options.getHost(), this.options.getPort(), this.options.getUser(), this.options.getPassword(), this.sessionPoolSize);
        this.pool.setStorageGroup(this.options.getStorageGroup());
        for (IoTDBOptions.TimeseriesOption timeseriesOption : this.options.getTimeseriesOptionList()) {
            if (!this.pool.checkTimeseriesExists(timeseriesOption.getPath())) {
                this.pool.createTimeseries(timeseriesOption.getPath(), timeseriesOption.getDataType(), timeseriesOption.getEncoding(), timeseriesOption.getCompressor());
            }
        }
    }

    void initScheduler() {
        if (this.batchSize > 0) {
            this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
            this.scheduledExecutor.scheduleAtFixedRate(() -> {
                try {
                    flush();
                } catch (Exception e) {
                    LOG.error("flush error", e);
                }
            }, this.flushIntervalMs, this.flushIntervalMs, TimeUnit.MILLISECONDS);
        }
    }

    void setSessionPool(SessionPool sessionPool) {
        this.pool = sessionPool;
    }

    public void invoke(IN in, SinkFunction.Context context) throws Exception {
        Event serialize = this.serializationSchema.serialize(in);
        if (serialize == null) {
            return;
        }
        if (this.batchSize <= 0) {
            convertText(serialize.getDevice(), serialize.getMeasurements(), serialize.getValues());
            this.pool.insertRecord(serialize.getDevice(), serialize.getTimestamp().longValue(), serialize.getMeasurements(), serialize.getTypes(), serialize.getValues());
            LOG.debug("send event successfully");
        } else {
            synchronized (this.batchList) {
                this.batchList.add(serialize);
                if (this.batchList.size() >= this.batchSize) {
                    flush();
                }
            }
        }
    }

    public IoTDBSink<IN> withBatchSize(int i) {
        Preconditions.checkArgument(i >= 0);
        this.batchSize = i;
        return this;
    }

    public IoTDBSink<IN> withFlushIntervalMs(int i) {
        Preconditions.checkArgument(i > 0);
        this.flushIntervalMs = i;
        return this;
    }

    public IoTDBSink<IN> withSessionPoolSize(int i) {
        Preconditions.checkArgument(i > 0);
        this.sessionPoolSize = i;
        return this;
    }

    public void close() throws Exception {
        if (this.pool != null) {
            try {
                flush();
            } catch (Exception e) {
                LOG.error("flush error", e);
            }
            this.pool.close();
        }
        if (this.scheduledExecutor != null) {
            this.scheduledExecutor.shutdown();
        }
    }

    private void convertText(String str, List<String> list, List<Object> list2) {
        if (str == null || list == null || list2 == null || list.size() != list2.size()) {
            return;
        }
        for (int i = 0; i < list.size(); i++) {
            IoTDBOptions.TimeseriesOption timeseriesOption = this.timeseriesOptionMap.get(str + "." + list.get(i));
            if (timeseriesOption != null && TSDataType.TEXT.equals(timeseriesOption.getDataType())) {
                list2.set(i, "'" + list2.get(i) + "'");
            }
        }
    }

    private void flush() throws Exception {
        if (this.batchSize > 0) {
            synchronized (this.batchList) {
                if (this.batchList.size() > 0) {
                    ArrayList arrayList = new ArrayList();
                    ArrayList arrayList2 = new ArrayList();
                    ArrayList arrayList3 = new ArrayList();
                    ArrayList arrayList4 = new ArrayList();
                    ArrayList arrayList5 = new ArrayList();
                    for (Event event : this.batchList) {
                        convertText(event.getDevice(), event.getMeasurements(), event.getValues());
                        arrayList.add(event.getDevice());
                        arrayList2.add(event.getTimestamp());
                        arrayList3.add(event.getMeasurements());
                        arrayList4.add(event.getTypes());
                        arrayList5.add(event.getValues());
                    }
                    this.pool.insertRecords(arrayList, arrayList2, arrayList3, arrayList4, arrayList5);
                    LOG.debug("send event successfully");
                    this.batchList.clear();
                }
            }
        }
    }
}
