package com.questdb.net;

import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.mp.MPSequence;
import com.questdb.mp.RingQueue;
import com.questdb.mp.SCSequence;
import com.questdb.mp.Sequence;
import com.questdb.mp.SynchronizedJob;
import com.questdb.net.Context;
import com.questdb.network.Kqueue;
import com.questdb.network.KqueueAccessor;
import com.questdb.network.Net;
import com.questdb.network.NetworkError;
import com.questdb.std.LongMatrix;
import com.questdb.std.Misc;
import com.questdb.std.ObjectFactory;
import com.questdb.std.Os;
import com.questdb.std.time.MillisecondClock;

/* loaded from: input_file:com/questdb/net/KQueueDispatcher.class */
public class KQueueDispatcher<C extends Context> extends SynchronizedJob implements Dispatcher<C> {
    private static final Log LOG = LogFactory.getLog(KQueueDispatcher.class);
    private final long socketFd;
    private final RingQueue<Event<C>> ioQueue;
    private final Sequence ioSequence;
    private final RingQueue<Event<C>> interestQueue;
    private final MPSequence interestPubSequence;
    private final MillisecondClock clock;
    private final Kqueue kqueue;
    private final int timeout;
    private final int maxConnections;
    private final int capacity;
    private final ContextFactory<C> contextFactory;
    private final SCSequence interestSubSequence = new SCSequence();
    private final LongMatrix<C> pending = new LongMatrix<>(2);
    private int connectionCount = 0;

    public KQueueDispatcher(CharSequence charSequence, int i, int i2, int i3, RingQueue<Event<C>> ringQueue, Sequence sequence, MillisecondClock millisecondClock, int i4, ObjectFactory<Event<C>> objectFactory, ContextFactory<C> contextFactory) {
        this.ioQueue = ringQueue;
        this.ioSequence = sequence;
        this.interestQueue = new RingQueue<>(objectFactory, ringQueue.getCapacity());
        this.interestPubSequence = new MPSequence(this.interestQueue.getCapacity());
        this.interestPubSequence.then(this.interestSubSequence).then(this.interestPubSequence);
        this.clock = millisecondClock;
        this.maxConnections = i2;
        this.timeout = i3;
        this.capacity = i4;
        this.contextFactory = contextFactory;
        this.kqueue = new Kqueue(i4);
        this.socketFd = Net.socketTcp(false);
        if (!Net.bindTcp(this.socketFd, charSequence, i)) {
            throw NetworkError.instance(Os.errno()).couldNotBindSocket();
        }
        Net.listen(this.socketFd, 128);
        this.kqueue.listen(this.socketFd);
        LOG.debug().$((CharSequence) "Listening socket: ").$(this.socketFd).$();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.kqueue.close();
        if (Net.close(this.socketFd) != 0) {
            LOG.error().$((CharSequence) "failed to close socket [fd=").$(this.socketFd).$((CharSequence) ", errno=").$(Os.errno()).$(']').$();
        }
        int size = this.pending.size();
        for (int i = 0; i < size; i++) {
            Misc.free(this.pending.get(i));
        }
        this.pending.zapTop(size);
    }

    @Override // com.questdb.net.Dispatcher
    public int getConnectionCount() {
        return this.connectionCount;
    }

    @Override // com.questdb.net.Dispatcher
    public void registerChannel(C c, int i) {
        long nextBully = this.interestPubSequence.nextBully();
        Event<C> event = this.interestQueue.get(nextBully);
        event.context = c;
        event.channelStatus = i;
        LOG.debug().$((CharSequence) "Re-queuing ").$(c.getFd()).$();
        this.interestPubSequence.done(nextBully);
    }

    private long accept() {
        long accept = Net.accept(this.socketFd);
        LOG.info().$((CharSequence) " Connected ").$(accept).$();
        if (accept < 0) {
            LOG.error().$((CharSequence) "Error in accept: ").$(accept).$();
            return -1L;
        }
        if (Net.configureNonBlocking(accept) < 0) {
            LOG.error().$((CharSequence) "Cannot make FD non-blocking [fd=").$(accept).$((CharSequence) ", errno=").$(Os.errno()).$(']').$();
            if (Net.close(accept) == 0) {
                return -1L;
            }
            LOG.error().$((CharSequence) "failed to close [fd=").$(accept).$((CharSequence) ", errno=").$(Os.errno()).$(']').$();
            return -1L;
        }
        this.connectionCount++;
        if (this.connectionCount <= this.maxConnections) {
            return accept;
        }
        LOG.info().$((CharSequence) "Too many connections, kicking out [fd=").$(accept).$(']').$();
        if (Net.close(accept) != 0) {
            LOG.error().$((CharSequence) "failed to close [fd=").$(accept).$((CharSequence) ", errno=").$(Os.errno()).$(']').$();
        }
        this.connectionCount--;
        return -1L;
    }

    private void addPending(long j, long j2) {
        int addRow = this.pending.addRow();
        LOG.debug().$((CharSequence) " Matrix row ").$(addRow).$((CharSequence) " for ").$(j).$();
        this.pending.set(addRow, 0, j2);
        this.pending.set(addRow, 1, j);
        this.pending.set(addRow, this.contextFactory.newInstance(j, this.clock));
    }

    private void disconnect(C c, int i) {
        LOG.info().$((CharSequence) "Disconnected ").$ip(c.getIp()).$((CharSequence) " [").$(DisconnectReason.nameOf(i)).$(']').$();
        c.close();
        this.connectionCount--;
    }

    private void enqueuePending(int i) {
        int i2 = 0;
        int i3 = i;
        int size = this.pending.size();
        int i4 = 0;
        while (true) {
            int i5 = i4;
            if (i3 >= size) {
                break;
            }
            this.kqueue.setWriteOffset(i5);
            this.kqueue.readFD((int) this.pending.get(i3, 1), this.pending.get(i3, 0));
            LOG.debug().$((CharSequence) "kqueued ").$(this.pending.get(i3, 1)).$((CharSequence) " as ").$(i2 - 1).$();
            i2++;
            if (i2 > this.capacity - 1) {
                this.kqueue.register(i2);
                i2 = 0;
            }
            i3++;
            i4 = i5 + KqueueAccessor.SIZEOF_KEVENT;
        }
        if (i2 > 0) {
            this.kqueue.register(i2);
            LOG.debug().$((CharSequence) "Registered ").$(i2).$();
        }
    }

    private int findPending(int i, long j) {
        int binarySearch = this.pending.binarySearch(j, 0);
        if (binarySearch >= 0 && this.pending.get(binarySearch, 1) != i) {
            return scanRow(binarySearch + 1, i, j);
        }
        return binarySearch;
    }

    private void processIdleConnections(long j) {
        int i = 0;
        int i2 = 0;
        int size = this.pending.size();
        while (i2 < size && this.pending.get(i2, 0) < j) {
            disconnect(this.pending.get(i2), 2);
            i2++;
            i++;
        }
        this.pending.zapTop(i);
    }

    private boolean processRegistrations(long j) {
        boolean z = false;
        int i = 0;
        int i2 = 0;
        while (true) {
            long next = this.interestSubSequence.next();
            if (next <= -1) {
                if (i > 0) {
                    this.kqueue.register(i);
                }
                return z;
            }
            z = true;
            Event<C> event = this.interestQueue.get(next);
            C c = event.context;
            int i3 = event.channelStatus;
            this.interestSubSequence.done(next);
            int fd = (int) c.getFd();
            LOG.debug().$((CharSequence) "Registering ").$(fd).$((CharSequence) " status ").$(i3).$();
            this.kqueue.setWriteOffset(i2);
            i2 += KqueueAccessor.SIZEOF_KEVENT;
            i++;
            switch (i3) {
                case 1:
                    this.kqueue.readFD(fd, j);
                    break;
                case 3:
                    disconnect(c, 3);
                    continue;
                case 4:
                    this.kqueue.writeFD(fd, j);
                    break;
                case 5:
                    disconnect(c, 1);
                    continue;
            }
            int addRow = this.pending.addRow();
            this.pending.set(addRow, 0, j);
            this.pending.set(addRow, 1, fd);
            this.pending.set(addRow, c);
            if (i > this.capacity - 1) {
                this.kqueue.register(i);
                i = 0;
            }
        }
    }

    @Override // com.questdb.mp.SynchronizedJob
    protected boolean runSerially() {
        boolean z = false;
        int poll = this.kqueue.poll();
        int size = this.pending.size();
        long ticks = this.clock.getTicks();
        int i = 0;
        if (poll > 0) {
            for (int i2 = 0; i2 < poll; i2++) {
                this.kqueue.setReadOffset(i);
                i += KqueueAccessor.SIZEOF_KEVENT;
                int fd = this.kqueue.getFd();
                if (fd == this.socketFd) {
                    long accept = accept();
                    if (accept >= 0) {
                        addPending(accept, ticks);
                    }
                } else {
                    int findPending = findPending(fd, this.kqueue.getData());
                    if (findPending < 0) {
                        LOG.error().$((CharSequence) "Internal error: unknown FD: ").$(fd).$();
                    } else {
                        if (this.kqueue.getFlags() == -32751) {
                            disconnect(this.pending.get(findPending), 1);
                        } else {
                            long nextBully = this.ioSequence.nextBully();
                            Event<C> event = this.ioQueue.get(nextBully);
                            event.context = this.pending.get(findPending);
                            event.channelStatus = this.kqueue.getFilter() == KqueueAccessor.EVFILT_READ ? 1 : 4;
                            this.ioSequence.done(nextBully);
                            LOG.debug().$((CharSequence) "Queuing ").$(this.kqueue.getFilter()).$((CharSequence) " on ").$(fd).$();
                        }
                        this.pending.deleteRow(findPending);
                        size--;
                    }
                }
            }
            if (size < this.pending.size()) {
                enqueuePending(size);
            }
            z = true;
        }
        long j = ticks - this.timeout;
        if (this.pending.size() > 0 && this.pending.get(0, 0) < j) {
            processIdleConnections(j);
            z = true;
        }
        return processRegistrations(ticks) || z;
    }

    private int scanRow(int i, int i2, long j) {
        int size = this.pending.size();
        for (int i3 = i; i3 < size; i3++) {
            if (this.pending.get(i3, 0) != j) {
                return -(i3 + 1);
            }
            if (this.pending.get(i3, 1) == i2) {
                return i3;
            }
        }
        return -1;
    }
}
