package io.mantisrx.connector.job.core;

import io.mantisrx.client.SinkConnectionsStatus;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/mantisrx/connector/job/core/DefaultSinkConnectionStatusObserver.class */
public class DefaultSinkConnectionStatusObserver implements SinkConnectionStatusObserver {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSinkConnectionStatusObserver.class);
    private static final SinkConnectionStatusObserver INSTANCE = new DefaultSinkConnectionStatusObserver();
    private final AtomicLong numConnected = new AtomicLong();
    private final AtomicLong total = new AtomicLong();
    private final AtomicLong receivingData = new AtomicLong();

    public static synchronized SinkConnectionStatusObserver getInstance(boolean z) {
        return z ? INSTANCE : new DefaultSinkConnectionStatusObserver();
    }

    public static SinkConnectionStatusObserver getInstance() {
        return getInstance(true);
    }

    public void onCompleted() {
        LOGGER.error("SinkConnectionStatusObserver completed!");
    }

    public void onError(Throwable th) {
        LOGGER.error("Got Error", th);
    }

    public void onNext(SinkConnectionsStatus sinkConnectionsStatus) {
        LOGGER.info("Got SinkConnectionStatus update " + sinkConnectionsStatus);
        this.numConnected.set(sinkConnectionsStatus.getNumConnected());
        this.total.set(sinkConnectionsStatus.getTotal());
        this.receivingData.set(sinkConnectionsStatus.getRecevingDataFrom());
    }

    @Override // io.mantisrx.connector.job.core.SinkConnectionStatusObserver
    public long getConnectedServerCount() {
        return this.numConnected.get();
    }

    @Override // io.mantisrx.connector.job.core.SinkConnectionStatusObserver
    public long getTotalServerCount() {
        return this.total.get();
    }

    @Override // io.mantisrx.connector.job.core.SinkConnectionStatusObserver
    public long getReceivingDataCount() {
        return this.receivingData.get();
    }

    @Override // io.mantisrx.connector.job.core.SinkConnectionStatusObserver
    public boolean isConnectedToAllSinks() {
        if (this.receivingData.get() > 0 && this.numConnected.get() > 0 && this.total.get() > 0 && this.numConnected.get() == this.total.get() && this.total.get() == this.receivingData.get()) {
            return true;
        }
        LOGGER.warn("NOT connected to all sinks  connected : " + this.numConnected.get() + " total " + this.total.get() + " receiving Data " + this.receivingData.get());
        return false;
    }
}
