package org.apache.skywalking.apm.agent.core.remote;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.commands.CommandService;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.dependencies.io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.network.common.v3.Commands;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.TraceSegmentReportServiceGrpc;

@DefaultImplementor
/* loaded from: input_file:org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.class */
public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener {
    private static final ILog logger = LogManager.getLogger((Class<?>) TraceSegmentServiceClient.class);
    private long lastLogTime;
    private long segmentUplinkedCounter;
    private long segmentAbandonedCounter;
    private volatile DataCarrier<TraceSegment> carrier;
    private volatile TraceSegmentReportServiceGrpc.TraceSegmentReportServiceStub serviceStub;
    private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;

    @Override // org.apache.skywalking.apm.agent.core.boot.BootService
    public void prepare() {
        ((GRPCChannelManager) ServiceManager.INSTANCE.findService(GRPCChannelManager.class)).addChannelListener(this);
    }

    @Override // org.apache.skywalking.apm.agent.core.boot.BootService
    public void boot() {
        this.lastLogTime = System.currentTimeMillis();
        this.segmentUplinkedCounter = 0L;
        this.segmentAbandonedCounter = 0L;
        this.carrier = new DataCarrier<>(Config.Buffer.CHANNEL_SIZE, Config.Buffer.BUFFER_SIZE);
        this.carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
        this.carrier.consume(this, 1);
    }

    @Override // org.apache.skywalking.apm.agent.core.boot.BootService
    public void onComplete() {
        TracingContext.ListenerManager.add(this);
    }

    @Override // org.apache.skywalking.apm.agent.core.boot.BootService
    public void shutdown() {
        TracingContext.ListenerManager.remove(this);
        this.carrier.shutdownConsumers();
    }

    @Override // org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer
    public void init() {
    }

    @Override // org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer
    public void consume(List<TraceSegment> list) {
        if (GRPCChannelStatus.CONNECTED.equals(this.status)) {
            final GRPCStreamServiceStatus gRPCStreamServiceStatus = new GRPCStreamServiceStatus(false);
            StreamObserver<SegmentObject> collect = this.serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() { // from class: org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient.1
                @Override // org.apache.skywalking.apm.dependencies.io.grpc.stub.StreamObserver
                public void onNext(Commands commands) {
                    ((CommandService) ServiceManager.INSTANCE.findService(CommandService.class)).receiveCommand(commands);
                }

                @Override // org.apache.skywalking.apm.dependencies.io.grpc.stub.StreamObserver
                public void onError(Throwable th) {
                    gRPCStreamServiceStatus.finished();
                    if (TraceSegmentServiceClient.logger.isErrorEnable()) {
                        TraceSegmentServiceClient.logger.error(th, "Send UpstreamSegment to collector fail with a grpc internal exception.", new Object[0]);
                    }
                    ((GRPCChannelManager) ServiceManager.INSTANCE.findService(GRPCChannelManager.class)).reportError(th);
                }

                @Override // org.apache.skywalking.apm.dependencies.io.grpc.stub.StreamObserver
                public void onCompleted() {
                    gRPCStreamServiceStatus.finished();
                }
            });
            try {
                Iterator<TraceSegment> it = list.iterator();
                while (it.hasNext()) {
                    collect.onNext(it.next().transform());
                }
            } catch (Throwable th) {
                logger.error(th, "Transform and send UpstreamSegment to collector fail.", new Object[0]);
            }
            collect.onCompleted();
            gRPCStreamServiceStatus.wait4Finish();
            this.segmentUplinkedCounter += list.size();
        } else {
            this.segmentAbandonedCounter += list.size();
        }
        printUplinkStatus();
    }

    private void printUplinkStatus() {
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this.lastLogTime > 30000) {
            this.lastLogTime = currentTimeMillis;
            if (this.segmentUplinkedCounter > 0) {
                logger.debug("{} trace segments have been sent to collector.", Long.valueOf(this.segmentUplinkedCounter));
                this.segmentUplinkedCounter = 0L;
            }
            if (this.segmentAbandonedCounter > 0) {
                logger.debug("{} trace segments have been abandoned, cause by no available channel.", Long.valueOf(this.segmentAbandonedCounter));
                this.segmentAbandonedCounter = 0L;
            }
        }
    }

    @Override // org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer
    public void onError(List<TraceSegment> list, Throwable th) {
        logger.error(th, "Try to send {} trace segments to collector, with unexpected exception.", Integer.valueOf(list.size()));
    }

    @Override // org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer
    public void onExit() {
    }

    @Override // org.apache.skywalking.apm.agent.core.context.TracingContextListener
    public void afterFinished(TraceSegment traceSegment) {
        if (traceSegment.isIgnore() || this.carrier.produce(traceSegment) || !logger.isDebugEnable()) {
            return;
        }
        logger.debug("One trace segment has been abandoned, cause by buffer is full.");
    }

    @Override // org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener
    public void statusChanged(GRPCChannelStatus gRPCChannelStatus) {
        if (GRPCChannelStatus.CONNECTED.equals(gRPCChannelStatus)) {
            this.serviceStub = TraceSegmentReportServiceGrpc.newStub(((GRPCChannelManager) ServiceManager.INSTANCE.findService(GRPCChannelManager.class)).getChannel());
        }
        this.status = gRPCChannelStatus;
    }
}
