package oracle.kv.util.migrator.impl.sink.ondb;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import oracle.kv.BulkWriteOptions;
import oracle.kv.EntryStream;
import oracle.kv.KVStore;
import oracle.kv.KeyValue;
import oracle.kv.impl.api.KVStoreImpl;
import oracle.kv.impl.api.table.NameUtils;
import oracle.kv.impl.api.table.TableAPIImpl;
import oracle.kv.impl.api.table.TableImpl;
import oracle.kv.impl.api.table.TableMetadataHelper;
import oracle.kv.query.ExecuteOptions;
import oracle.kv.query.PrepareCallback;
import oracle.kv.table.Row;
import oracle.kv.table.Table;
import oracle.kv.table.TableAPI;
import oracle.kv.util.migrator.DataSource;
import oracle.kv.util.migrator.DataTransform;
import oracle.kv.util.migrator.StateHandler;
import oracle.kv.util.migrator.TransformException;
import oracle.kv.util.migrator.data.DataEntry;
import oracle.kv.util.migrator.impl.ConfigBase;
import oracle.kv.util.migrator.impl.sink.DataSinkBaseImpl;
import oracle.kv.util.migrator.impl.sink.ondb.OndbSinkConfig;
import oracle.kv.util.migrator.impl.sink.ondb.ValueSerializer;
import oracle.kv.util.migrator.impl.transform.AddJsonFieldTransform;
import oracle.kv.util.migrator.impl.util.MigratorUtils;
import oracle.kv.util.migrator.impl.util.OndbUtils;

/* loaded from: input_file:oracle/kv/util/migrator/impl/sink/ondb/OndbSink.class */
public class OndbSink extends DataSinkBaseImpl {
    private final OndbSinkConfig config;
    private final KVStore store;
    private final TableAPI tableAPI;

    /* loaded from: input_file:oracle/kv/util/migrator/impl/sink/ondb/OndbSink$DataKVStream.class */
    private class DataKVStream implements EntryStream<KeyValue> {
        private final ValueSerializer.RowSerializerImpl rowSerializer;
        private final DataSource source;
        private final TableImpl table;
        private long readCount = 0;
        private final AtomicLong dupCount = new AtomicLong();
        private final AtomicLong errCount = new AtomicLong();
        private long startTimeMs = 0;
        private long endTimeMs;

        public DataKVStream(DataSource dataSource, Table table) {
            this.source = dataSource;
            this.table = (TableImpl) table;
            this.rowSerializer = (ValueSerializer.RowSerializerImpl) ValueSerializer.createRowSerializer(table);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // oracle.kv.EntryStream
        public KeyValue getNext() {
            if (this.startTimeMs == 0) {
                this.startTimeMs = System.currentTimeMillis();
            }
            while (true) {
                DataEntry dataEntry = null;
                try {
                    dataEntry = this.source.readNext();
                    if (dataEntry == null) {
                        return null;
                    }
                    KeyValue serializeToKeyValue = serializeToKeyValue(dataEntry);
                    this.readCount++;
                    return serializeToKeyValue;
                } catch (RuntimeException e) {
                    if (e instanceof TransformException) {
                        dataEntry = ((TransformException) e).getDataEntry();
                    }
                    OndbSink.this.loadError(this.source, dataEntry == null ? "" : dataEntry.toString(), "Read next entry failed", e);
                }
            }
        }

        private KeyValue serializeToKeyValue(DataEntry dataEntry) {
            this.rowSerializer.setRowValue(dataEntry);
            try {
                return new KeyValue(this.table.createKeyInternal(this.rowSerializer, false), this.table.createValueInternal(this.rowSerializer));
            } catch (RuntimeException e) {
                OndbSink.this.log(Level.SEVERE, "Serialize data entry failed:" + e);
                throw e;
            }
        }

        @Override // oracle.kv.EntryStream
        public void catchException(RuntimeException runtimeException, KeyValue keyValue) {
            OndbSink.this.loadError(this.source, getPrimaryKey(keyValue).toJsonString(false), runtimeException);
            this.errCount.incrementAndGet();
        }

        @Override // oracle.kv.EntryStream
        public void keyExists(KeyValue keyValue) {
            OndbSink.this.loadWarning(this.source, "key exists", getPrimaryKey(keyValue).toJsonString(false));
            this.dupCount.incrementAndGet();
        }

        @Override // oracle.kv.EntryStream
        public void completed() {
            OndbSink.this.log(Level.INFO, this.source.getName() + " done");
            this.endTimeMs = System.currentTimeMillis();
            OndbSink.this.setComplete(this.source, this.endTimeMs, getNumLoaded(), getStatisticInfo());
            close();
        }

        @Override // oracle.kv.EntryStream
        public String name() {
            return "Stream " + this.source.getName();
        }

        private Row getPrimaryKey(KeyValue keyValue) {
            return this.table.createRowFromKeyBytes(keyValue.getKey().toByteArray());
        }

        private void close() {
            this.rowSerializer.clear();
        }

        private long getElapseTimeMs() {
            if (this.endTimeMs > 0) {
                return this.endTimeMs - this.startTimeMs;
            }
            return 0L;
        }

        private long getNumLoaded() {
            return (this.readCount - this.errCount.get()) - this.dupCount.get();
        }

        private String getStatisticInfo() {
            long elapseTimeMs = getElapseTimeMs();
            return OndbSink.this.config.getOverwrite() ? String.format("Load %,d records (%,d read; %,d failed) from %s: %s", Long.valueOf(getNumLoaded()), Long.valueOf(this.readCount), Long.valueOf(this.errCount.get()), this.source.getName(), MigratorUtils.formatElapseTime(elapseTimeMs)) : String.format("Load %,d records (%,d read; %,d pre-existing; %,d failed) from %s: %s", Long.valueOf(getNumLoaded()), Long.valueOf(this.readCount), Long.valueOf(this.dupCount.get()), Long.valueOf(this.errCount.get()), this.source.getName(), MigratorUtils.formatElapseTime(elapseTimeMs));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:oracle/kv/util/migrator/impl/sink/ondb/OndbSink$PrepareCallbackImpl.class */
    public class PrepareCallbackImpl implements PrepareCallback {
        PrepareCallback.QueryOperation qop;
        String tableName;

        private PrepareCallbackImpl() {
        }

        @Override // oracle.kv.query.PrepareCallback
        public void queryOperation(PrepareCallback.QueryOperation queryOperation) {
            this.qop = queryOperation;
        }

        public PrepareCallback.QueryOperation getOperation() {
            return this.qop;
        }

        @Override // oracle.kv.query.PrepareCallback
        public void tableName(String str) {
            this.tableName = str;
        }

        public String getTableName() {
            return this.tableName;
        }

        @Override // oracle.kv.query.PrepareCallback
        public void indexName(String str) {
        }

        @Override // oracle.kv.query.PrepareCallback
        public void namespaceName(String str) {
        }

        @Override // oracle.kv.query.PrepareCallback
        public void ifNotExistsFound() {
        }

        @Override // oracle.kv.query.PrepareCallback
        public void ifExistsFound() {
        }

        @Override // oracle.kv.query.PrepareCallback
        public boolean prepareNeeded() {
            return false;
        }

        @Override // oracle.kv.query.PrepareCallback
        public void isTextIndex() {
        }

        @Override // oracle.kv.query.PrepareCallback
        public void newTable(Table table) {
        }

        @Override // oracle.kv.query.PrepareCallback
        public TableMetadataHelper getMetadataHelper() {
            return null;
        }
    }

    public OndbSink(OndbSinkConfig ondbSinkConfig, StateHandler stateHandler, Logger logger) {
        super(OndbSinkConfig.NAME, makeName(ondbSinkConfig), stateHandler, logger);
        this.config = ondbSinkConfig;
        this.store = OndbUtils.connectToStore(ondbSinkConfig.getStoreName(), ondbSinkConfig.getHelperHosts(), ondbSinkConfig.getUsername(), ondbSinkConfig.getSecurity());
        this.tableAPI = this.store.getTableAPI();
        log(Level.INFO, "Connected to target store. storeName=" + ondbSinkConfig.getStoreName() + "; helperHosts=" + Arrays.toString(ondbSinkConfig.getHelperHosts()));
    }

    private static String makeName(OndbSinkConfig ondbSinkConfig) {
        StringBuilder sb = new StringBuilder();
        boolean z = true;
        for (String str : ondbSinkConfig.getHelperHosts()) {
            if (z) {
                z = false;
            } else {
                sb.append("-");
            }
            sb.append(str);
        }
        sb.append("-");
        sb.append(ondbSinkConfig.getStoreName());
        return sb.toString();
    }

    @Override // oracle.kv.util.migrator.impl.sink.DataSinkBaseImpl
    public void doWrite(DataSource[] dataSourceArr) {
        ArrayList arrayList = new ArrayList(dataSourceArr.length);
        log(Level.INFO, "Start loading from " + dataSourceArr.length + " sources");
        for (Integer num : sortSources(dataSourceArr)) {
            DataSource dataSource = dataSourceArr[num.intValue()];
            OndbSinkConfig.TableInfo tableInfo = this.config.getTableInfo(dataSource.getTargetTable());
            try {
                OndbUtils.validateNamespace(tableInfo.getNamespace());
            } catch (IllegalArgumentException e) {
                ConfigBase.raiseInvalidConfigError(null, e.getMessage(), tableInfo);
            }
            try {
                OndbUtils.validateTableName(tableInfo.getTableName());
            } catch (IllegalArgumentException e2) {
                ConfigBase.raiseInvalidConfigError(null, e2.getMessage(), tableInfo);
            }
            Table executeDdls = executeDdls(tableInfo);
            DataTransform[] createTransforms = createTransforms(tableInfo, executeDdls);
            if (createTransforms != null) {
                for (DataTransform dataTransform : createTransforms) {
                    dataSource.addTransform(dataTransform);
                }
            }
            arrayList.add(new DataKVStream(dataSource, executeDdls));
        }
        BulkWriteOptions createBulkWriteOptions = createBulkWriteOptions(arrayList.size());
        log(Level.INFO, "Start loading with bulkWriteOptions[streamParallelism=" + createBulkWriteOptions.getStreamParallelism() + ", perShardParallelism=" + createBulkWriteOptions.getPerShardParallelism() + ", requestTimeoutMs=" + createBulkWriteOptions.getTimeout() + ", overwrite=" + createBulkWriteOptions.getOverwrite() + "]");
        this.store.put(arrayList, createBulkWriteOptions);
    }

    private BulkWriteOptions createBulkWriteOptions(int i) {
        BulkWriteOptions bulkWriteOptions = new BulkWriteOptions();
        if (this.config.getPerShardConcurrency() > 0) {
            bulkWriteOptions.setPerShardParallelism(this.config.getPerShardConcurrency());
        }
        if (this.config.getRequestTimeoutMs() > 0) {
            bulkWriteOptions.setTimeout(this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
        }
        int streamConcurrency = this.config.getStreamConcurrency();
        if (streamConcurrency <= 0) {
            streamConcurrency = Math.min(i, bulkWriteOptions.getPerShardParallelism() * ((KVStoreImpl) this.store).getTopology().getRepGroupMap().size());
        }
        bulkWriteOptions.setStreamParallelism(streamConcurrency);
        bulkWriteOptions.setOverwrite(this.config.getOverwrite());
        return bulkWriteOptions;
    }

    private Integer[] sortSources(final DataSource[] dataSourceArr) {
        Integer[] numArr = new Integer[dataSourceArr.length];
        for (int i = 0; i < dataSourceArr.length; i++) {
            numArr[i] = Integer.valueOf(i);
        }
        Arrays.sort(numArr, new Comparator<Integer>() { // from class: oracle.kv.util.migrator.impl.sink.ondb.OndbSink.1
            private final String seperator = String.valueOf('.');
            private final String pattern = Pattern.quote(this.seperator);

            @Override // java.util.Comparator
            public int compare(Integer num, Integer num2) {
                String targetTable = dataSourceArr[num.intValue()].getTargetTable();
                String targetTable2 = dataSourceArr[num2.intValue()].getTargetTable();
                if (!targetTable.contains(this.seperator)) {
                    return !targetTable2.contains(this.seperator) ? 0 : -1;
                }
                if (targetTable2.contains(this.seperator)) {
                    return targetTable.split(this.pattern).length - targetTable2.split(this.pattern).length;
                }
                return 1;
            }
        });
        return numArr;
    }

    @Override // oracle.kv.util.migrator.impl.SourceSinkBase, oracle.kv.util.migrator.DataSink
    public void close() {
        super.close();
        if (this.store != null) {
            this.store.close();
        }
    }

    private DataTransform[] createTransforms(OndbSinkConfig.TableInfo tableInfo, Table table) {
        if (tableInfo.getCreateJsonTable()) {
            return new DataTransform[]{new AddJsonFieldTransform(new HashSet(table.getPrimaryKey()), tableInfo.getJsonFieldName(), this.logger)};
        }
        return null;
    }

    private Table executeDdls(OndbSinkConfig.TableInfo tableInfo) {
        String qualifiedName = this.config.getQualifiedName(tableInfo);
        String namespace = this.config.getNamespace(tableInfo);
        String str = namespace != null ? "[namespace=" + namespace + "]" : "";
        ExecuteOptions executeOptions = null;
        if (namespace != null) {
            namespace = NameUtils.switchToInternalUse(namespace);
            if (namespace != null) {
                executeOptions = new ExecuteOptions().setNamespace(namespace);
            }
        }
        for (String str2 : getDdls(tableInfo, namespace)) {
            try {
                log(Level.INFO, str2 + str);
                this.store.executeSync(str2, executeOptions);
            } catch (RuntimeException e) {
                String str3 = "Execute \"" + str2 + "\"" + str + " failed";
                if (!tableInfo.getContinueOnDdlError()) {
                    log(Level.SEVERE, str3, e);
                    throw new RuntimeException(str3 + ": " + e.getMessage());
                }
                log(Level.WARNING, "[Skipped] " + str3 + ": " + e.getMessage());
            }
        }
        Table table = this.tableAPI.getTable(qualifiedName);
        if (table != null) {
            return table;
        }
        String str4 = "Table not found: " + qualifiedName;
        log(Level.SEVERE, str4);
        throw new IllegalArgumentException(str4);
    }

    private List<String> getDdls(OndbSinkConfig.TableInfo tableInfo, String str) {
        String tableStatement;
        PrepareCallbackImpl prepareCallbackImpl = new PrepareCallbackImpl();
        ExecuteOptions namespace = new ExecuteOptions().setPrepareCallback(prepareCallbackImpl).setNamespace(str);
        String tableName = tableInfo.getTableName();
        ArrayList arrayList = new ArrayList();
        if (str != null && ((TableAPIImpl) this.tableAPI).getTableMetadata().getNamespace(str) == null) {
            arrayList.add("CREATE NAMESPACE IF NOT EXISTS " + str);
        }
        if (tableInfo.getCreateJsonTable()) {
            tableStatement = makeCreateJsonTableDdl(tableInfo);
        } else {
            tableStatement = tableInfo.getTableStatement();
            try {
                this.store.prepare(tableStatement, namespace);
                PrepareCallback.QueryOperation operation = prepareCallbackImpl.getOperation();
                if (operation != PrepareCallback.QueryOperation.CREATE_TABLE && operation != PrepareCallback.QueryOperation.ALTER_TABLE) {
                    throw new IllegalArgumentException("only create table or alter table can be specified with \"tableStatement\"");
                }
                if (!prepareCallbackImpl.getTableName().equalsIgnoreCase(tableName)) {
                    throw new IllegalArgumentException("ddl operation is not for table " + tableName);
                }
            } catch (IllegalArgumentException e) {
                String str2 = "Invalid ddl statement \"" + tableStatement + "\": " + e.getMessage();
                if (!tableInfo.getContinueOnDdlError()) {
                    throw new IllegalArgumentException(str2);
                }
                this.logger.warning("[Skipped] " + str2);
                tableStatement = null;
            }
        }
        if (tableStatement != null) {
            arrayList.add(tableStatement);
        }
        if (tableInfo.getIndexStatements() != null) {
            for (String str3 : tableInfo.getIndexStatements()) {
                try {
                    this.store.prepare(str3, namespace);
                } catch (IllegalArgumentException e2) {
                    String str4 = "Invalid ddl statement \"" + str3 + "\": " + e2.getMessage();
                    if (!tableInfo.getContinueOnDdlError()) {
                        throw new IllegalArgumentException(str4);
                    }
                    this.logger.warning("[Skipped] " + str4);
                }
                if (prepareCallbackImpl.getOperation() != PrepareCallback.QueryOperation.CREATE_INDEX && prepareCallbackImpl.getOperation() != PrepareCallback.QueryOperation.DROP_INDEX) {
                    throw new IllegalArgumentException("Only create/drop index statement can be specified with \"indexStatements]\": " + prepareCallbackImpl.getOperation());
                }
                if (!prepareCallbackImpl.getTableName().equalsIgnoreCase(tableName)) {
                    throw new IllegalArgumentException("Not ddl operation for table " + tableName + ", but for table " + prepareCallbackImpl.getTableName());
                }
                arrayList.add(str3);
            }
        }
        return arrayList;
    }

    private String makeCreateJsonTableDdl(OndbSinkConfig.TableInfo tableInfo) {
        String tableName = tableInfo.getTableName();
        Map<String, String> primaryKeyFields = tableInfo.getPrimaryKeyFields();
        TreeSet<String> treeSet = new TreeSet(String.CASE_INSENSITIVE_ORDER);
        treeSet.addAll(Arrays.asList(tableInfo.getShardKeyFields()));
        String jsonFieldName = tableInfo.getJsonFieldName();
        StringBuilder sb = new StringBuilder();
        sb.append("CREATE TABLE IF NOT EXISTS ");
        sb.append(tableName);
        sb.append("(");
        for (Map.Entry<String, String> entry : primaryKeyFields.entrySet()) {
            sb.append(entry.getKey());
            sb.append(" ");
            sb.append(entry.getValue());
            sb.append(", ");
        }
        sb.append(jsonFieldName);
        sb.append(" json, primary key(");
        boolean z = true;
        if (treeSet.size() < primaryKeyFields.size()) {
            sb.append("shard(");
            for (String str : treeSet) {
                if (z) {
                    z = false;
                } else {
                    sb.append(", ");
                }
                sb.append(str);
            }
            sb.append(")");
            for (String str2 : primaryKeyFields.keySet()) {
                if (!treeSet.contains(str2)) {
                    sb.append(", ");
                    sb.append(str2);
                }
            }
            sb.append("))");
        } else {
            for (String str3 : primaryKeyFields.keySet()) {
                if (z) {
                    z = false;
                } else {
                    sb.append(", ");
                }
                sb.append(str3);
            }
            sb.append("))");
        }
        return sb.toString();
    }
}
