package org.apache.skywalking.apm.agent.core.meter;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.meter.transform.MeterTransformer;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
import org.apache.skywalking.apm.agent.core.remote.GRPCStreamServiceStatus;
import org.apache.skywalking.apm.dependencies.io.grpc.Status;
import org.apache.skywalking.apm.dependencies.io.grpc.StatusRuntimeException;
import org.apache.skywalking.apm.dependencies.io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.network.common.v3.Commands;
import org.apache.skywalking.apm.network.language.agent.v3.MeterData;
import org.apache.skywalking.apm.network.language.agent.v3.MeterReportServiceGrpc;

@DefaultImplementor
/* loaded from: input_file:org/apache/skywalking/apm/agent/core/meter/MeterSender.class */
public class MeterSender implements BootService, GRPCChannelListener {
    private static final ILog logger = LogManager.getLogger((Class<?>) MeterSender.class);
    private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
    private volatile MeterReportServiceGrpc.MeterReportServiceStub meterReportServiceStub;

    @Override // org.apache.skywalking.apm.agent.core.boot.BootService
    public void prepare() {
        ((GRPCChannelManager) ServiceManager.INSTANCE.findService(GRPCChannelManager.class)).addChannelListener(this);
    }

    @Override // org.apache.skywalking.apm.agent.core.boot.BootService
    public void boot() {
    }

    public void send(Map<MeterId, MeterTransformer> map, MeterService meterService) {
        if (this.status == GRPCChannelStatus.CONNECTED) {
            StreamObserver<MeterData> streamObserver = null;
            final GRPCStreamServiceStatus gRPCStreamServiceStatus = new GRPCStreamServiceStatus(false);
            try {
                try {
                    streamObserver = this.meterReportServiceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() { // from class: org.apache.skywalking.apm.agent.core.meter.MeterSender.1
                        @Override // org.apache.skywalking.apm.dependencies.io.grpc.stub.StreamObserver
                        public void onNext(Commands commands) {
                        }

                        @Override // org.apache.skywalking.apm.dependencies.io.grpc.stub.StreamObserver
                        public void onError(Throwable th) {
                            gRPCStreamServiceStatus.finished();
                            if (MeterSender.logger.isErrorEnable()) {
                                MeterSender.logger.error(th, "Send meters to collector fail with a grpc internal exception.", new Object[0]);
                            }
                            ((GRPCChannelManager) ServiceManager.INSTANCE.findService(GRPCChannelManager.class)).reportError(th);
                        }

                        @Override // org.apache.skywalking.apm.dependencies.io.grpc.stub.StreamObserver
                        public void onCompleted() {
                            gRPCStreamServiceStatus.finished();
                        }
                    });
                    transform(map, meterData -> {
                        streamObserver.onNext(meterData);
                    });
                    if (streamObserver != null) {
                        streamObserver.onCompleted();
                    }
                    gRPCStreamServiceStatus.wait4Finish();
                } catch (Throwable th) {
                    if (!(th instanceof StatusRuntimeException)) {
                        logger.error(th, "Report meters to backend fail.", new Object[0]);
                        if (streamObserver != null) {
                            streamObserver.onCompleted();
                        }
                        gRPCStreamServiceStatus.wait4Finish();
                        return;
                    }
                    if (((StatusRuntimeException) th).getStatus().getCode() == Status.Code.UNIMPLEMENTED) {
                        logger.warn("Backend doesn't support meter, it will be disabled", new Object[0]);
                        meterService.shutdown();
                    }
                    if (streamObserver != null) {
                        streamObserver.onCompleted();
                    }
                    gRPCStreamServiceStatus.wait4Finish();
                }
            } catch (Throwable th2) {
                if (streamObserver != null) {
                    streamObserver.onCompleted();
                }
                gRPCStreamServiceStatus.wait4Finish();
                throw th2;
            }
        }
    }

    protected void transform(Map<MeterId, MeterTransformer> map, Consumer<MeterData> consumer) {
        boolean z = false;
        Iterator<MeterTransformer> it = map.values().iterator();
        while (it.hasNext()) {
            MeterData.Builder transform = it.next().transform();
            if (transform != null) {
                if (!z) {
                    transform.setService(Config.Agent.SERVICE_NAME);
                    transform.setServiceInstance(Config.Agent.INSTANCE_NAME);
                    transform.setTimestamp(System.currentTimeMillis());
                    z = true;
                }
                consumer.accept(transform.build());
            }
        }
    }

    @Override // org.apache.skywalking.apm.agent.core.boot.BootService
    public void onComplete() {
    }

    @Override // org.apache.skywalking.apm.agent.core.boot.BootService
    public void shutdown() {
    }

    @Override // org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener
    public void statusChanged(GRPCChannelStatus gRPCChannelStatus) {
        if (GRPCChannelStatus.CONNECTED.equals(gRPCChannelStatus)) {
            this.meterReportServiceStub = MeterReportServiceGrpc.newStub(((GRPCChannelManager) ServiceManager.INSTANCE.findService(GRPCChannelManager.class)).getChannel());
        } else {
            this.meterReportServiceStub = null;
        }
        this.status = gRPCChannelStatus;
    }
}
