package org.apache.shardingsphere.shardingscaling.core.synctask.realtime;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import lombok.Generated;
import org.apache.shardingsphere.shardingscaling.core.config.ScalingContext;
import org.apache.shardingsphere.shardingscaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.shardingscaling.core.controller.SyncProgress;
import org.apache.shardingsphere.shardingscaling.core.controller.task.ReportCallback;
import org.apache.shardingsphere.shardingscaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.shardingscaling.core.execute.engine.SyncTaskExecuteCallback;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.SyncExecutorGroup;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.channel.DistributionChannel;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.position.LogPositionManager;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.position.LogPositionManagerFactory;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.reader.Reader;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.reader.ReaderFactory;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.writer.Writer;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.writer.WriterFactory;
import org.apache.shardingsphere.shardingscaling.core.synctask.SyncTask;
import org.apache.shardingsphere.spi.database.metadata.DataSourceMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/shardingscaling/core/synctask/realtime/RealtimeDataSyncTask.class */
public final class RealtimeDataSyncTask implements SyncTask {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(RealtimeDataSyncTask.class);
    private final SyncConfiguration syncConfiguration;
    private final DataSourceManager dataSourceManager;
    private final String syncTaskId;
    private LogPositionManager logPositionManager;
    private Reader reader;
    private long delayMillisecond;

    public RealtimeDataSyncTask(SyncConfiguration syncConfiguration, DataSourceManager dataSourceManager) {
        this.syncConfiguration = syncConfiguration;
        this.dataSourceManager = dataSourceManager;
        DataSourceMetaData dataSourceMetaData = syncConfiguration.getReaderConfiguration().getDataSourceConfiguration().getDataSourceMetaData();
        Object[] objArr = new Object[1];
        objArr[0] = null != dataSourceMetaData.getCatalog() ? dataSourceMetaData.getCatalog() : dataSourceMetaData.getSchema();
        this.syncTaskId = String.format("realtime-%s", objArr);
    }

    @Override // org.apache.shardingsphere.shardingscaling.core.synctask.SyncTask
    public void prepare() {
        this.logPositionManager = instanceLogPositionManager();
        this.logPositionManager.getCurrentPosition();
    }

    private LogPositionManager instanceLogPositionManager() {
        return LogPositionManagerFactory.newInstanceLogManager(this.syncConfiguration.getReaderConfiguration().getDataSourceConfiguration().getDatabaseType().getName(), this.dataSourceManager.getDataSource(this.syncConfiguration.getReaderConfiguration().getDataSourceConfiguration()));
    }

    @Override // org.apache.shardingsphere.shardingscaling.core.synctask.SyncTask
    public void start(ReportCallback reportCallback) {
        this.syncConfiguration.getReaderConfiguration().setTableNameMap(this.syncConfiguration.getTableNameMap());
        SyncExecutorGroup syncExecutorGroup = new SyncExecutorGroup(new SyncTaskExecuteCallback(getClass().getSimpleName(), this.syncTaskId, reportCallback));
        instanceSyncExecutors(syncExecutorGroup);
        ScalingContext.getInstance().getSyncTaskExecuteEngine().submitGroup(syncExecutorGroup);
    }

    private void instanceSyncExecutors(SyncExecutorGroup syncExecutorGroup) {
        this.reader = ReaderFactory.newInstanceLogReader(this.syncConfiguration.getReaderConfiguration(), this.logPositionManager.getCurrentPosition());
        List<Writer> instanceWriters = instanceWriters();
        DistributionChannel instanceChannel = instanceChannel(instanceWriters);
        this.reader.setChannel(instanceChannel);
        Iterator<Writer> it = instanceWriters.iterator();
        while (it.hasNext()) {
            it.next().setChannel(instanceChannel);
        }
        syncExecutorGroup.setChannel(instanceChannel);
        syncExecutorGroup.addSyncExecutor(this.reader);
        syncExecutorGroup.addAllSyncExecutor(instanceWriters);
    }

    private List<Writer> instanceWriters() {
        ArrayList arrayList = new ArrayList(this.syncConfiguration.getConcurrency());
        for (int i = 0; i < this.syncConfiguration.getConcurrency(); i++) {
            arrayList.add(WriterFactory.newInstance(this.syncConfiguration.getWriterConfiguration(), this.dataSourceManager));
        }
        return arrayList;
    }

    private DistributionChannel instanceChannel(List<Writer> list) {
        return new DistributionChannel(list.size(), list2 -> {
            Record record = (Record) list2.get(list2.size() - 1);
            this.logPositionManager.updateCurrentPosition(record.getLogPosition());
            this.delayMillisecond = System.currentTimeMillis() - record.getCommitTime();
        });
    }

    @Override // org.apache.shardingsphere.shardingscaling.core.synctask.SyncTask
    public void stop() {
        if (null != this.reader) {
            this.reader.stop();
            this.reader = null;
        }
    }

    @Override // org.apache.shardingsphere.shardingscaling.core.synctask.SyncTask
    public SyncProgress getProgress() {
        return new RealTimeDataSyncTaskProgress(this.syncTaskId, this.delayMillisecond, this.logPositionManager.getCurrentPosition());
    }
}
