package io.questdb.cutlass.line.tcp;

import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.CairoEngine;
import io.questdb.cairo.CairoException;
import io.questdb.cairo.CairoSecurityContext;
import io.questdb.cairo.EntryUnavailableException;
import io.questdb.cairo.vm.Vm;
import io.questdb.cairo.vm.api.MemoryMARW;
import io.questdb.cutlass.line.tcp.LineTcpReceiver;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.MPSequence;
import io.questdb.mp.RingQueue;
import io.questdb.mp.SCSequence;
import io.questdb.mp.WorkerPool;
import io.questdb.network.IODispatcher;
import io.questdb.std.Chars;
import io.questdb.std.DirectByteCharSequenceObjHashMap;
import io.questdb.std.LowerCaseCharSequenceObjHashMap;
import io.questdb.std.Misc;
import io.questdb.std.Numbers;
import io.questdb.std.ObjList;
import io.questdb.std.Os;
import io.questdb.std.SimpleReadWriteLock;
import io.questdb.std.datetime.millitime.MillisecondClock;
import io.questdb.std.str.DirectByteCharSequence;
import io.questdb.std.str.Path;
import io.questdb.std.str.StringSink;
import io.questdb.tasks.TelemetryTask;
import java.io.Closeable;
import java.util.Arrays;
import java.util.concurrent.locks.ReadWriteLock;
import org.jetbrains.annotations.NotNull;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/questdb/cutlass/line/tcp/LineTcpMeasurementScheduler.class */
public class LineTcpMeasurementScheduler implements Closeable {
    private static final Log LOG;
    private final ObjList<TableUpdateDetails>[] assignedTables;
    private final boolean autoCreateNewColumns;
    private final boolean autoCreateNewTables;
    private final LineTcpReceiverConfiguration configuration;
    private final DefaultColumnTypes defaultColumnTypes;
    private final CairoEngine engine;
    private final LowerCaseCharSequenceObjHashMap<TableUpdateDetails> idleTableUpdateDetailsUtf16;
    private final long[] loadByWriterThread;
    private final NetworkIOJob[] netIoJobs;
    private final MPSequence[] pubSeq;
    private final RingQueue<LineTcpMeasurementEvent>[] queue;
    private final CairoSecurityContext securityContext;
    private final StringSink[] tableNameSinks;
    private final TableStructureAdapter tableStructureAdapter;
    private final LowerCaseCharSequenceObjHashMap<TableUpdateDetails> tableUpdateDetailsUtf16;
    private final long writerIdleTimeout;
    private LineTcpReceiver.SchedulerListener listener;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final MemoryMARW ddlMem = Vm.getMARWInstance();
    private final Path path = new Path();
    private final ReadWriteLock tableUpdateDetailsLock = new SimpleReadWriteLock();

    /* JADX INFO: Access modifiers changed from: package-private */
    public LineTcpMeasurementScheduler(LineTcpReceiverConfiguration lineTcpReceiverConfiguration, CairoEngine cairoEngine, WorkerPool workerPool, IODispatcher<LineTcpConnectionContext> iODispatcher, WorkerPool workerPool2) {
        this.engine = cairoEngine;
        this.securityContext = lineTcpReceiverConfiguration.getCairoSecurityContext();
        CairoConfiguration configuration = cairoEngine.getConfiguration();
        this.configuration = lineTcpReceiverConfiguration;
        MillisecondClock millisecondClock = configuration.getMillisecondClock();
        this.defaultColumnTypes = new DefaultColumnTypes(lineTcpReceiverConfiguration);
        int workerCount = workerPool.getWorkerCount();
        this.netIoJobs = new NetworkIOJob[workerCount];
        this.tableNameSinks = new StringSink[workerCount];
        for (int i = 0; i < workerCount; i++) {
            this.tableNameSinks[i] = new StringSink();
            NetworkIOJob createNetworkIOJob = createNetworkIOJob(iODispatcher, i);
            this.netIoJobs[i] = createNetworkIOJob;
            workerPool.assign(i, createNetworkIOJob);
            workerPool.freeOnExit(createNetworkIOJob);
        }
        this.tableUpdateDetailsUtf16 = new LowerCaseCharSequenceObjHashMap<>();
        this.idleTableUpdateDetailsUtf16 = new LowerCaseCharSequenceObjHashMap<>();
        this.loadByWriterThread = new long[workerPool2.getWorkerCount()];
        this.autoCreateNewTables = lineTcpReceiverConfiguration.getAutoCreateNewTables();
        this.autoCreateNewColumns = lineTcpReceiverConfiguration.getAutoCreateNewColumns();
        int maxMeasurementSize = lineTcpReceiverConfiguration.getMaxMeasurementSize();
        int writerQueueCapacity = lineTcpReceiverConfiguration.getWriterQueueCapacity();
        long commitIntervalDefault = this.configuration.getCommitIntervalDefault();
        int workerCount2 = workerPool2.getWorkerCount();
        this.pubSeq = new MPSequence[workerCount2];
        this.queue = new RingQueue[workerCount2];
        this.assignedTables = new ObjList[workerCount2];
        for (int i2 = 0; i2 < workerCount2; i2++) {
            MPSequence mPSequence = new MPSequence(writerQueueCapacity);
            this.pubSeq[i2] = mPSequence;
            RingQueue<LineTcpMeasurementEvent> ringQueue = new RingQueue<>((j, j2) -> {
                return new LineTcpMeasurementEvent(j, j2, lineTcpReceiverConfiguration.getMicrosecondClock(), lineTcpReceiverConfiguration.getTimestampAdapter(), this.defaultColumnTypes, lineTcpReceiverConfiguration.isStringToCharCastAllowed(), lineTcpReceiverConfiguration.isSymbolAsFieldSupported(), lineTcpReceiverConfiguration.getMaxFileNameLength(), lineTcpReceiverConfiguration.getAutoCreateNewColumns(), cairoEngine.getConfiguration().getDefaultSymbolCapacity(), cairoEngine.getConfiguration().getDefaultSymbolCacheFlag());
            }, getEventSlotSize(maxMeasurementSize), writerQueueCapacity, 46);
            this.queue[i2] = ringQueue;
            SCSequence sCSequence = new SCSequence();
            mPSequence.then(sCSequence).then(mPSequence);
            this.assignedTables[i2] = new ObjList<>();
            LineTcpWriterJob lineTcpWriterJob = new LineTcpWriterJob(i2, ringQueue, sCSequence, millisecondClock, commitIntervalDefault, this, cairoEngine.getMetrics(), this.assignedTables[i2]);
            workerPool2.assign(i2, lineTcpWriterJob);
            workerPool2.freeOnExit(lineTcpWriterJob);
        }
        this.tableStructureAdapter = new TableStructureAdapter(configuration, this.defaultColumnTypes, this.configuration.getDefaultPartitionBy());
        this.writerIdleTimeout = lineTcpReceiverConfiguration.getWriterIdleTimeout();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.tableUpdateDetailsLock.writeLock().lock();
        try {
            closeLocals(this.tableUpdateDetailsUtf16);
            closeLocals(this.idleTableUpdateDetailsUtf16);
            Misc.free(this.path);
            Misc.free(this.ddlMem);
            int length = this.assignedTables.length;
            for (int i = 0; i < length; i++) {
                Misc.freeObjList(this.assignedTables[i]);
                this.assignedTables[i].clear();
            }
            int length2 = this.queue.length;
            for (int i2 = 0; i2 < length2; i2++) {
                Misc.free(this.queue[i2]);
            }
        } finally {
            this.tableUpdateDetailsLock.writeLock().unlock();
        }
    }

    public boolean doMaintenance(DirectByteCharSequenceObjHashMap<TableUpdateDetails> directByteCharSequenceObjHashMap, int i, long j) {
        int size = directByteCharSequenceObjHashMap.size();
        for (int i2 = 0; i2 < size; i2++) {
            String str = directByteCharSequenceObjHashMap.keys().get(i2);
            TableUpdateDetails tableUpdateDetails = directByteCharSequenceObjHashMap.get(str);
            if (j - tableUpdateDetails.getLastMeasurementMillis() >= this.writerIdleTimeout) {
                this.tableUpdateDetailsLock.writeLock().lock();
                try {
                    if (tableUpdateDetails.getNetworkIOOwnerCount() != 1) {
                        directByteCharSequenceObjHashMap.remove(str);
                        tableUpdateDetails.removeReference(i);
                        boolean z = size > 1;
                        this.tableUpdateDetailsLock.writeLock().unlock();
                        return z;
                    }
                    int writerThreadId = tableUpdateDetails.getWriterThreadId();
                    long nextPublisherEventSequence = getNextPublisherEventSequence(writerThreadId);
                    if (nextPublisherEventSequence > -1) {
                        this.queue[writerThreadId].get(nextPublisherEventSequence).createWriterReleaseEvent(tableUpdateDetails, true);
                        directByteCharSequenceObjHashMap.remove(str);
                        String tableNameUtf16 = tableUpdateDetails.getTableNameUtf16();
                        this.tableUpdateDetailsUtf16.remove(tableNameUtf16);
                        this.idleTableUpdateDetailsUtf16.put(tableNameUtf16, tableUpdateDetails);
                        tableUpdateDetails.removeReference(i);
                        this.pubSeq[writerThreadId].done(nextPublisherEventSequence);
                        if (this.listener != null) {
                            this.listener.onEvent(tableNameUtf16, 1);
                        }
                        LOG.info().$((CharSequence) "active table going idle [tableName=").$((CharSequence) tableNameUtf16).I$();
                    }
                    return true;
                } finally {
                    this.tableUpdateDetailsLock.writeLock().unlock();
                }
            }
        }
        return false;
    }

    public void processWriterReleaseEvent(LineTcpMeasurementEvent lineTcpMeasurementEvent, int i) {
        this.tableUpdateDetailsLock.readLock().lock();
        try {
            TableUpdateDetails tableUpdateDetails = lineTcpMeasurementEvent.getTableUpdateDetails();
            if (tableUpdateDetails.getWriterThreadId() != i) {
                return;
            }
            if (!lineTcpMeasurementEvent.getTableUpdateDetails().isWriterInError() && this.tableUpdateDetailsUtf16.keyIndex(tableUpdateDetails.getTableNameUtf16()) < 0) {
                this.tableUpdateDetailsLock.readLock().unlock();
                return;
            }
            LOG.info().$((CharSequence) "releasing writer, its been idle since ").$ts(tableUpdateDetails.getLastMeasurementMillis() * 1000).$((CharSequence) "[tableName=").$((CharSequence) tableUpdateDetails.getTableNameUtf16()).I$();
            lineTcpMeasurementEvent.releaseWriter();
            this.tableUpdateDetailsLock.readLock().unlock();
        } finally {
            this.tableUpdateDetailsLock.readLock().unlock();
        }
    }

    private static long getEventSlotSize(int i) {
        return Numbers.ceilPow2((i / 4) * 13);
    }

    private void closeLocals(LowerCaseCharSequenceObjHashMap<TableUpdateDetails> lowerCaseCharSequenceObjHashMap) {
        ObjList keys = lowerCaseCharSequenceObjHashMap.keys();
        int size = keys.size();
        for (int i = 0; i < size; i++) {
            lowerCaseCharSequenceObjHashMap.get((CharSequence) keys.get(i)).closeLocals();
        }
        lowerCaseCharSequenceObjHashMap.clear();
    }

    private TableUpdateDetails getTableUpdateDetailsFromSharedArea(@NotNull NetworkIOJob networkIOJob, @NotNull LineTcpParser lineTcpParser) {
        TableUpdateDetails unsafeAssignTableToWriterThread;
        DirectByteCharSequence measurementName = lineTcpParser.getMeasurementName();
        StringSink stringSink = this.tableNameSinks[networkIOJob.getWorkerId()];
        stringSink.clear();
        Chars.utf8Decode(measurementName.getLo(), measurementName.getHi(), stringSink);
        this.tableUpdateDetailsLock.writeLock().lock();
        try {
            int keyIndex = this.tableUpdateDetailsUtf16.keyIndex(stringSink);
            if (keyIndex < 0) {
                unsafeAssignTableToWriterThread = this.tableUpdateDetailsUtf16.valueAt(keyIndex);
            } else {
                if (this.engine.getStatus(this.securityContext, this.path, stringSink, 0, stringSink.length()) != 0) {
                    if (!this.autoCreateNewTables) {
                        throw CairoException.nonCritical().put("table does not exist, creating new tables is disabled [table=").put(stringSink).put(']');
                    }
                    if (!this.autoCreateNewColumns) {
                        throw CairoException.nonCritical().put("table does not exist, cannot create table, creating new columns is disabled [table=").put(stringSink).put(']');
                    }
                    TableStructureAdapter of = this.tableStructureAdapter.of(stringSink, lineTcpParser);
                    int columnCount = of.getColumnCount();
                    for (int i = 0; i < columnCount; i++) {
                        if (of.getColumnType(i) == 0) {
                            throw CairoException.nonCritical().put("unknown column type [columnName=").put(of.getColumnName(i)).put(']');
                        }
                    }
                    LOG.info().$((CharSequence) "creating table [tableName=").$((CharSequence) stringSink).$(']').$();
                    this.engine.createTable(this.securityContext, this.ddlMem, this.path, of);
                }
                int keyIndex2 = this.idleTableUpdateDetailsUtf16.keyIndex(stringSink);
                if (keyIndex2 < 0) {
                    unsafeAssignTableToWriterThread = this.idleTableUpdateDetailsUtf16.valueAt(keyIndex2);
                    LOG.info().$((CharSequence) "idle table going active [tableName=").$((CharSequence) unsafeAssignTableToWriterThread.getTableNameUtf16()).I$();
                    if (unsafeAssignTableToWriterThread.getWriter() == null) {
                        unsafeAssignTableToWriterThread.closeNoLock();
                        unsafeAssignTableToWriterThread = unsafeAssignTableToWriterThread(keyIndex, unsafeAssignTableToWriterThread.getTableNameUtf16());
                    } else {
                        this.idleTableUpdateDetailsUtf16.removeAt(keyIndex2);
                        this.tableUpdateDetailsUtf16.putAt(keyIndex, unsafeAssignTableToWriterThread.getTableNameUtf16(), unsafeAssignTableToWriterThread);
                    }
                } else {
                    TelemetryTask.doStoreTelemetry(this.engine, (short) 102, (short) 5);
                    unsafeAssignTableToWriterThread = unsafeAssignTableToWriterThread(keyIndex, stringSink);
                }
            }
            stringSink.clear();
            stringSink.put(measurementName);
            networkIOJob.addTableUpdateDetails(stringSink.toString(), unsafeAssignTableToWriterThread);
            TableUpdateDetails tableUpdateDetails = unsafeAssignTableToWriterThread;
            this.tableUpdateDetailsLock.writeLock().unlock();
            return tableUpdateDetails;
        } catch (Throwable th) {
            this.tableUpdateDetailsLock.writeLock().unlock();
            throw th;
        }
    }

    private boolean isOpen() {
        return null != this.pubSeq;
    }

    @NotNull
    private TableUpdateDetails unsafeAssignTableToWriterThread(int i, CharSequence charSequence) {
        unsafeCalcThreadLoad();
        long j = Long.MAX_VALUE;
        int i2 = 0;
        int length = this.loadByWriterThread.length;
        for (int i3 = 0; i3 < length; i3++) {
            if (this.loadByWriterThread[i3] < j) {
                j = this.loadByWriterThread[i3];
                i2 = i3;
            }
        }
        TableUpdateDetails tableUpdateDetails = new TableUpdateDetails(this.configuration, this.engine, this.engine.getTableWriterAPI(this.securityContext, charSequence, "tcpIlp"), i2, this.netIoJobs, this.defaultColumnTypes);
        this.tableUpdateDetailsUtf16.putAt(i, tableUpdateDetails.getTableNameUtf16(), tableUpdateDetails);
        LOG.info().$((CharSequence) "assigned ").$(charSequence).$((CharSequence) " to thread ").$(i2).$();
        return tableUpdateDetails;
    }

    private void unsafeCalcThreadLoad() {
        Arrays.fill(this.loadByWriterThread, 0L);
        ObjList keys = this.tableUpdateDetailsUtf16.keys();
        int size = keys.size();
        for (int i = 0; i < size; i++) {
            CharSequence charSequence = (CharSequence) keys.getQuick(i);
            TableUpdateDetails tableUpdateDetails = this.tableUpdateDetailsUtf16.get(charSequence);
            if (tableUpdateDetails != null) {
                long[] jArr = this.loadByWriterThread;
                int writerThreadId = tableUpdateDetails.getWriterThreadId();
                jArr[writerThreadId] = jArr[writerThreadId] + tableUpdateDetails.getEventsProcessedSinceReshuffle();
            } else {
                LOG.error().$((CharSequence) "could not find statistic for table [name=").$(charSequence).I$();
            }
        }
    }

    protected NetworkIOJob createNetworkIOJob(IODispatcher<LineTcpConnectionContext> iODispatcher, int i) {
        return new LineTcpNetworkIOJob(this.configuration, this, iODispatcher, i);
    }

    long getNextPublisherEventSequence(int i) {
        if (!$assertionsDisabled && !isOpen()) {
            throw new AssertionError();
        }
        while (true) {
            long next = this.pubSeq[i].next();
            if (next != -2) {
                return next;
            }
            Os.pause();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean scheduleEvent(NetworkIOJob networkIOJob, LineTcpParser lineTcpParser) {
        try {
            TableUpdateDetails localTableDetails = networkIOJob.getLocalTableDetails(lineTcpParser.getMeasurementName());
            if (localTableDetails == null) {
                localTableDetails = getTableUpdateDetailsFromSharedArea(networkIOJob, lineTcpParser);
            }
            int writerThreadId = localTableDetails.getWriterThreadId();
            long nextPublisherEventSequence = getNextPublisherEventSequence(writerThreadId);
            if (nextPublisherEventSequence <= -1) {
                return true;
            }
            try {
                if (localTableDetails.isWriterInError()) {
                    throw CairoException.critical(0).put("writer is in error, aborting ILP pipeline");
                }
                this.queue[writerThreadId].get(nextPublisherEventSequence).createMeasurementEvent(localTableDetails, lineTcpParser, networkIOJob.getWorkerId());
                this.pubSeq[writerThreadId].done(nextPublisherEventSequence);
                localTableDetails.incrementEventsProcessedSinceReshuffle();
                return false;
            } catch (Throwable th) {
                this.pubSeq[writerThreadId].done(nextPublisherEventSequence);
                throw th;
            }
        } catch (EntryUnavailableException e) {
            LOG.info().$((CharSequence) "could not get table writer [tableName=").$((CharSequence) lineTcpParser.getMeasurementName()).$((CharSequence) ", ex=`").$(e.getFlyweightMessage()).$((CharSequence) "`]").$();
            return true;
        } catch (CairoException e2) {
            LOG.error().$((CharSequence) "could not create table [tableName=").$((CharSequence) lineTcpParser.getMeasurementName()).$((CharSequence) ", errno=").$(e2.getErrno()).I$();
            throw e2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setListener(LineTcpReceiver.SchedulerListener schedulerListener) {
        this.listener = schedulerListener;
    }

    static {
        $assertionsDisabled = !LineTcpMeasurementScheduler.class.desiredAssertionStatus();
        LOG = LogFactory.getLog((Class<?>) LineTcpMeasurementScheduler.class);
    }
}
