package org.yamcs.tctm;

import com.google.common.util.concurrent.RateLimiter;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.yamcs.ConfigurationException;
import org.yamcs.YConfiguration;
import org.yamcs.commanding.PreparedCommand;

/* loaded from: input_file:org/yamcs/tctm/AbstractThreadedTcDataLink.class */
public abstract class AbstractThreadedTcDataLink extends AbstractTcDataLink implements Runnable {
    Thread thread;
    RateLimiter rateLimiter;
    protected BlockingQueue<PreparedCommand> commandQueue;
    long initialDelay;

    @Override // org.yamcs.tctm.AbstractTcDataLink, org.yamcs.tctm.AbstractLink, org.yamcs.tctm.Link
    public void init(String str, String str2, YConfiguration yConfiguration) throws ConfigurationException {
        super.init(str, str2, yConfiguration);
        if (yConfiguration.containsKey("tcQueueSize")) {
            this.commandQueue = new LinkedBlockingQueue(yConfiguration.getInt("tcQueueSize"));
        } else {
            this.commandQueue = new LinkedBlockingQueue();
        }
        this.initialDelay = yConfiguration.getLong("initialDelay", 0L);
        if (yConfiguration.containsKey("tcMaxRate")) {
            this.rateLimiter = RateLimiter.create(yConfiguration.getInt("tcMaxRate"));
        }
    }

    protected void doStart() {
        if (isDisabled()) {
            this.initialDelay = 0L;
        } else {
            doEnable();
        }
        notifyStarted();
    }

    protected void doStop() {
        if (isDisabled()) {
            notifyStopped();
            return;
        }
        try {
            shutDown();
            this.commandQueue.clear();
            this.commandQueue.offer(SIGNAL_QUIT);
            try {
                this.thread.join();
            } catch (InterruptedException e) {
                this.log.warn("Interrupted while waiting for thread shutdown");
                Thread.currentThread().interrupt();
            }
            notifyStopped();
        } catch (Exception e2) {
            notifyFailed(e2);
        }
    }

    @Override // org.yamcs.tctm.TcDataLink
    public boolean sendCommand(PreparedCommand preparedCommand) {
        if (this.commandQueue.offer(preparedCommand)) {
            return true;
        }
        this.log.warn("Cannot put command {} in the queue, because it's full; sending NACK", preparedCommand);
        this.commandHistoryPublisher.commandFailed(preparedCommand.getCommandId(), getCurrentTime(), "Link " + this.linkName + ": queue full");
        return true;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.initialDelay > 0) {
            try {
                Thread.sleep(this.initialDelay);
                this.initialDelay = 0L;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }
        try {
            startUp();
        } catch (Exception e2) {
            this.log.error("Failed to startUp", e2);
        }
        while (isRunningAndEnabled()) {
            doHousekeeping();
            try {
                PreparedCommand poll = this.commandQueue.poll(this.housekeepingInterval, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    if (poll == SIGNAL_QUIT) {
                        return;
                    }
                    if (this.rateLimiter != null) {
                        this.rateLimiter.acquire();
                    }
                    uplinkCommand(poll);
                }
            } catch (InterruptedException e3) {
                Thread.currentThread().interrupt();
            } catch (Exception e4) {
                this.log.error("Error when sending command: ", e4);
                throw new RuntimeException(e4);
            }
        }
        try {
            shutDown();
        } catch (Exception e5) {
            this.log.error("Failed to shutDown", e5);
            disable();
        }
    }

    @Override // org.yamcs.tctm.AbstractLink
    protected void doEnable() {
        this.thread = new Thread(this);
        this.thread.setName(getClass().getSimpleName() + "-" + this.linkName);
        this.thread.start();
    }

    @Override // org.yamcs.tctm.AbstractLink
    protected void doDisable() {
        if (this.thread != null) {
            this.thread.interrupt();
        }
    }

    protected void doHousekeeping() {
    }

    protected abstract void uplinkCommand(PreparedCommand preparedCommand) throws IOException;

    protected abstract void startUp() throws Exception;

    protected abstract void shutDown() throws Exception;
}
