/*
 * Decompiled with CFR 0.152.
 */
package io.questdb.cutlass.line.udp;

import io.questdb.cairo.CairoEngine;
import io.questdb.cairo.CairoException;
import io.questdb.cutlass.line.CairoLineProtoParser;
import io.questdb.cutlass.line.LineProtoLexer;
import io.questdb.cutlass.line.udp.LineUdpReceiverConfiguration;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.SOCountDownLatch;
import io.questdb.mp.SynchronizedJob;
import io.questdb.mp.WorkerPool;
import io.questdb.network.NetworkFacade;
import io.questdb.std.Misc;
import io.questdb.std.Os;
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class AbstractLineProtoReceiver
extends SynchronizedJob
implements Closeable {
    private static final Log LOG = LogFactory.getLog(AbstractLineProtoReceiver.class);
    protected final LineProtoLexer lexer;
    protected final CairoLineProtoParser parser;
    protected final NetworkFacade nf;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final SOCountDownLatch started = new SOCountDownLatch();
    private final SOCountDownLatch halted = new SOCountDownLatch(1);
    private final LineUdpReceiverConfiguration configuration;
    protected long fd;
    protected int commitRate;
    protected long totalCount = 0L;
    protected final int commitMode;

    public AbstractLineProtoReceiver(LineUdpReceiverConfiguration configuration, CairoEngine engine, WorkerPool workerPool) {
        this.configuration = configuration;
        this.commitMode = configuration.getCommitMode();
        this.nf = configuration.getNetworkFacade();
        this.fd = this.nf.socketUdp();
        if (this.fd < 0L) {
            int errno = this.nf.errno();
            LOG.error().$("cannot open UDP socket [errno=").$(errno).$(']').$();
            throw CairoException.instance(errno).put("Cannot open UDP socket");
        }
        try {
            this.bind(configuration);
            this.commitRate = configuration.getCommitRate();
            if (configuration.getReceiveBufferSize() != -1 && this.nf.setRcvBuf(this.fd, configuration.getReceiveBufferSize()) != 0) {
                LOG.error().$("cannot set receive buffer size [fd=").$(this.fd).$(", size=").$(configuration.getReceiveBufferSize()).$(']').$();
            }
            this.lexer = new LineProtoLexer(configuration.getMsgBufferSize());
            this.parser = new CairoLineProtoParser(engine, configuration.getCairoSecurityContext(), configuration.getTimestampAdapter());
            this.lexer.withParser(this.parser);
            if (!configuration.ownThread()) {
                workerPool.assign(this);
                this.logStarted(configuration);
            }
        }
        catch (CairoException e) {
            this.close();
            throw e;
        }
    }

    @Override
    public void close() {
        if (this.fd > -1L) {
            this.halt();
            if (this.nf.close(this.fd) != 0) {
                LOG.error().$("failed to close [fd=").$(this.fd).$(", errno=").$(this.nf.errno()).$(']').$();
            } else {
                LOG.info().$("closed [fd=").$(this.fd).$(']').$();
            }
            if (this.parser != null) {
                this.parser.commitAll(this.commitMode);
                this.parser.close();
            }
            Misc.free(this.lexer);
            LOG.info().$("closed [fd=").$(this.fd).$(']').$();
            this.fd = -1L;
        }
    }

    public void halt() {
        if (this.running.compareAndSet(true, false)) {
            this.started.await();
            this.halted.await();
        }
    }

    public void start() {
        if (this.configuration.ownThread() && this.running.compareAndSet(false, true)) {
            new Thread(() -> {
                if (this.configuration.ownThreadAffinity() != -1) {
                    Os.setCurrentThreadAffinity(this.configuration.ownThreadAffinity());
                }
                this.logStarted(this.configuration);
                while (this.running.get()) {
                    this.runSerially();
                }
                LOG.info().$("shutdown").$();
                this.halted.countDown();
            }).start();
            this.started.countDown();
        }
    }

    private void bind(LineUdpReceiverConfiguration configuration) {
        if (this.nf.bindUdp(this.fd, configuration.isUnicast() ? configuration.getBindIPv4Address() : 0, configuration.getPort())) {
            if (!configuration.isUnicast() && !this.nf.join(this.fd, configuration.getBindIPv4Address(), configuration.getGroupIPv4Address())) {
                int errno = this.nf.errno();
                LOG.error().$("cannot join group [errno=").$(errno).$(", fd=").$(this.fd).$(", bind=").$(configuration.getBindIPv4Address()).$(", group=").$(configuration.getGroupIPv4Address()).$(']').$();
                throw CairoException.instance(this.nf.errno()).put("Cannot join group ").put(configuration.getGroupIPv4Address()).put(" [bindTo=").put(configuration.getBindIPv4Address()).put(']');
            }
        } else {
            int errno = this.nf.errno();
            LOG.error().$("cannot bind socket [errno=").$(errno).$(", fd=").$(this.fd).$(", bind=").$(configuration.getBindIPv4Address()).$(", port=").$(configuration.getPort()).$(']').$();
            throw CairoException.instance(this.nf.errno()).put("Cannot bind to ").put(configuration.getBindIPv4Address()).put(':').put(configuration.getPort());
        }
    }

    private void logStarted(LineUdpReceiverConfiguration configuration) {
        if (configuration.isUnicast()) {
            LOG.info().$("receiving unicast on ").$ip(configuration.getBindIPv4Address()).$(':').$(configuration.getPort()).$(" [fd=").$(this.fd).$(", commitRate=").$(this.commitRate).$(']').$();
        } else {
            LOG.info().$("receiving multicast from ").$ip(configuration.getGroupIPv4Address()).$(':').$(configuration.getPort()).$(" via ").$ip(configuration.getBindIPv4Address()).$(" [fd=").$(this.fd).$(", commitRate=").$(this.commitRate).$(']').$();
        }
    }
}

