package io.hstream.impl;

import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.stub.StreamObserver;
import io.hstream.Consumer;
import io.hstream.HRecordReceiver;
import io.hstream.HStreamDBClientException;
import io.hstream.RawRecordReceiver;
import io.hstream.ReceivedHRecord;
import io.hstream.ReceivedRawRecord;
import io.hstream.internal.HStreamApiGrpc;
import io.hstream.internal.HStreamRecord;
import io.hstream.internal.ReceivedRecord;
import io.hstream.internal.StreamingFetchRequest;
import io.hstream.internal.StreamingFetchResponse;
import io.hstream.util.GrpcUtils;
import io.hstream.util.RecordUtils;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hstream/impl/ConsumerImpl.class */
public class ConsumerImpl extends AbstractService implements Consumer {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerImpl.class);
    private HStreamApiGrpc.HStreamApiStub grpcStub;
    private HStreamApiGrpc.HStreamApiBlockingStub grpcBlockingStub;
    private String consumerName;
    private String subscriptionId;
    private RawRecordReceiver rawRecordReceiver;
    private HRecordReceiver hRecordReceiver;
    private ExecutorService executorService;
    private final StreamObserver<StreamingFetchResponse> responseStream;
    private final StreamObserver<StreamingFetchRequest> requestStream;
    private final AtomicBoolean inited = new AtomicBoolean(false);

    public ConsumerImpl(HStreamApiGrpc.HStreamApiStub hStreamApiStub, HStreamApiGrpc.HStreamApiBlockingStub hStreamApiBlockingStub, final String str, final String str2, final RawRecordReceiver rawRecordReceiver, final HRecordReceiver hRecordReceiver) {
        this.grpcStub = hStreamApiStub;
        this.grpcBlockingStub = hStreamApiBlockingStub;
        if (str == null) {
            this.consumerName = UUID.randomUUID().toString();
        } else {
            this.consumerName = str;
        }
        this.subscriptionId = str2;
        this.rawRecordReceiver = rawRecordReceiver;
        this.hRecordReceiver = hRecordReceiver;
        this.executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("receiver-running-pool-%d").build());
        this.responseStream = new StreamObserver<StreamingFetchResponse>() { // from class: io.hstream.impl.ConsumerImpl.1
            public void onNext(StreamingFetchResponse streamingFetchResponse) {
                if (ConsumerImpl.this.inited.compareAndSet(false, true)) {
                }
                if (ConsumerImpl.this.isRunning()) {
                    for (ReceivedRecord receivedRecord : streamingFetchResponse.getReceivedRecordsList()) {
                        ResponderImpl responderImpl = new ResponderImpl(str2, ConsumerImpl.this.requestStream, str, receivedRecord.getRecordId());
                        ExecutorService executorService = ConsumerImpl.this.executorService;
                        RawRecordReceiver rawRecordReceiver2 = rawRecordReceiver;
                        HRecordReceiver hRecordReceiver2 = hRecordReceiver;
                        executorService.submit(() -> {
                            if (ConsumerImpl.this.isRunning()) {
                                if (RecordUtils.isRawRecord(receivedRecord)) {
                                    ConsumerImpl.logger.info("ready to process rawRecord");
                                    try {
                                        rawRecordReceiver2.processRawRecord(ConsumerImpl.toReceivedRawRecord(receivedRecord), responderImpl);
                                        return;
                                    } catch (Exception e) {
                                        ConsumerImpl.logger.error("process rawRecord error", e);
                                        return;
                                    }
                                }
                                ConsumerImpl.logger.info("ready to process hrecord");
                                try {
                                    hRecordReceiver2.processHRecord(ConsumerImpl.toReceivedHRecord(receivedRecord), responderImpl);
                                } catch (Exception e2) {
                                    ConsumerImpl.logger.error("process hrecord error", e2);
                                }
                            }
                        });
                    }
                }
            }

            public void onError(Throwable th) {
                if (!ConsumerImpl.this.inited.compareAndSet(false, true)) {
                    ConsumerImpl.logger.error("consumer {} receive records from subscription {} error: {}", new Object[]{ConsumerImpl.this.consumerName, ConsumerImpl.this.subscriptionId, th});
                } else {
                    ConsumerImpl.logger.error("consumer {} attach to subscription {} error: {}", new Object[]{ConsumerImpl.this.consumerName, ConsumerImpl.this.subscriptionId, th});
                    ConsumerImpl.this.notifyFailed(th);
                }
            }

            public void onCompleted() {
            }
        };
        this.requestStream = hStreamApiStub.streamingFetch(this.responseStream);
    }

    public void doStart() {
        logger.info("prepare to start consumer");
        this.requestStream.onNext(StreamingFetchRequest.newBuilder().setSubscriptionId(this.subscriptionId).setConsumerName(this.consumerName).m950build());
        logger.info("consumer {} started", this.consumerName);
        notifyStarted();
    }

    public void doStop() {
        logger.info("prepare to stop consumer");
        new Thread(() -> {
            this.requestStream.onCompleted();
            this.executorService.shutdown();
            logger.info("run shutdown done");
            try {
                this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
                logger.info("await terminate done");
            } catch (InterruptedException e) {
                logger.warn("wait timeout, consumer {} will be closed", this.consumerName);
            }
            logger.info("ready to notify stop");
            notifyStopped();
            logger.info("notify stop done");
        }).start();
    }

    private static ReceivedRawRecord toReceivedRawRecord(ReceivedRecord receivedRecord) {
        try {
            return new ReceivedRawRecord(GrpcUtils.recordIdFromGrpc(receivedRecord.getRecordId()), RecordUtils.parseRawRecordFromHStreamRecord(HStreamRecord.parseFrom(receivedRecord.getRecord())));
        } catch (InvalidProtocolBufferException e) {
            throw new HStreamDBClientException.InvalidRecordException("parse HStreamRecord error", e);
        }
    }

    private static ReceivedHRecord toReceivedHRecord(ReceivedRecord receivedRecord) {
        try {
            return new ReceivedHRecord(GrpcUtils.recordIdFromGrpc(receivedRecord.getRecordId()), RecordUtils.parseHRecordFromHStreamRecord(HStreamRecord.parseFrom(receivedRecord.getRecord())));
        } catch (InvalidProtocolBufferException e) {
            throw new HStreamDBClientException.InvalidRecordException("parse HStreamRecord error", e);
        }
    }
}
