package io.hstream.impl;

import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.stub.StreamObserver;
import io.hstream.Consumer;
import io.hstream.ConsumerHeartbeatRequest;
import io.hstream.ConsumerHeartbeatResponse;
import io.hstream.FetchRequest;
import io.hstream.FetchResponse;
import io.hstream.HRecordReceiver;
import io.hstream.HStreamApiGrpc;
import io.hstream.HStreamDBClientException;
import io.hstream.HStreamRecord;
import io.hstream.RawRecordReceiver;
import io.hstream.ReceivedHRecord;
import io.hstream.ReceivedRawRecord;
import io.hstream.ReceivedRecord;
import io.hstream.SubscribeRequest;
import io.hstream.SubscribeResponse;
import io.hstream.util.RecordUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hstream/impl/ConsumerImpl.class */
public class ConsumerImpl extends AbstractService implements Consumer {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerImpl.class);
    private HStreamApiGrpc.HStreamApiStub grpcStub;
    private HStreamApiGrpc.HStreamApiBlockingStub grpcBlockingStub;
    private String consumerName;
    private String subscriptionId;
    private RawRecordReceiver rawRecordReceiver;
    private HRecordReceiver hRecordReceiver;
    private static final long pollTimeoutMs = 1000;
    private static final int maxPollRecords = 1000;
    private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("receiver-running-pool-%d").build());
    private ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);

    public ConsumerImpl(HStreamApiGrpc.HStreamApiStub hStreamApiStub, HStreamApiGrpc.HStreamApiBlockingStub hStreamApiBlockingStub, String str, String str2, RawRecordReceiver rawRecordReceiver, HRecordReceiver hRecordReceiver) {
        this.grpcStub = hStreamApiStub;
        this.grpcBlockingStub = hStreamApiBlockingStub;
        this.consumerName = str;
        this.subscriptionId = str2;
        this.rawRecordReceiver = rawRecordReceiver;
        this.hRecordReceiver = hRecordReceiver;
    }

    public void doStart() {
        final ConsumerHeartbeatRequest m228build = ConsumerHeartbeatRequest.newBuilder().setSubscriptionId(this.subscriptionId).m228build();
        final StreamObserver<ConsumerHeartbeatResponse> streamObserver = new StreamObserver<ConsumerHeartbeatResponse>() { // from class: io.hstream.impl.ConsumerImpl.1
            public void onNext(ConsumerHeartbeatResponse consumerHeartbeatResponse) {
                ConsumerImpl.logger.info("consumer {} received heartbeat response for subscription {}", ConsumerImpl.this.consumerName, consumerHeartbeatResponse.getSubscriptionId());
            }

            public void onError(Throwable th) {
                ConsumerImpl.logger.error("consumer {} send heartbeat error: {}", ConsumerImpl.this.consumerName, th);
                throw new HStreamDBClientException.ConsumerException("send heartbeat error", th);
            }

            public void onCompleted() {
            }
        };
        final FetchRequest m416build = FetchRequest.newBuilder().setSubscriptionId(this.subscriptionId).setTimeout(pollTimeoutMs).setMaxSize(maxPollRecords).m416build();
        this.grpcStub.subscribe(SubscribeRequest.newBuilder().setSubscriptionId(this.subscriptionId).m849build(), new StreamObserver<SubscribeResponse>() { // from class: io.hstream.impl.ConsumerImpl.2
            public void onNext(SubscribeResponse subscribeResponse) {
                ConsumerImpl.logger.info("consumer {} attach to subscription {} successfully", ConsumerImpl.this.consumerName, subscribeResponse.getSubscriptionId());
                ExecutorService executorService = ConsumerImpl.this.executorService;
                FetchRequest fetchRequest = m416build;
                executorService.submit(() -> {
                    do {
                        ConsumerImpl.logger.info("start fetch and processing ...");
                        FetchResponse fetch = ConsumerImpl.this.grpcBlockingStub.fetch(fetchRequest);
                        ConsumerImpl.logger.info("fetched {} records", Integer.valueOf(fetch.getReceivedRecordsCount()));
                        for (ReceivedRecord receivedRecord : fetch.getReceivedRecordsList()) {
                            if (RecordUtils.isRawRecord(receivedRecord)) {
                                ConsumerImpl.logger.info("ready to process rawRecord");
                                ConsumerImpl.this.rawRecordReceiver.processRawRecord(ConsumerImpl.toReceivedRawRecord(receivedRecord), new ResponderImpl(ConsumerImpl.this.grpcBlockingStub, ConsumerImpl.this.subscriptionId, receivedRecord.getRecordId()));
                            } else {
                                ConsumerImpl.logger.info("ready to process hrecord");
                                ConsumerImpl.this.hRecordReceiver.processHRecord(ConsumerImpl.toReceivedHRecord(receivedRecord), new ResponderImpl(ConsumerImpl.this.grpcBlockingStub, ConsumerImpl.this.subscriptionId, receivedRecord.getRecordId()));
                            }
                        }
                        ConsumerImpl.logger.info("processed {} records", Integer.valueOf(fetch.getReceivedRecordsCount()));
                    } while (ConsumerImpl.this.isRunning());
                });
                ScheduledExecutorService scheduledExecutorService = ConsumerImpl.this.scheduledExecutorService;
                ConsumerHeartbeatRequest consumerHeartbeatRequest = m228build;
                StreamObserver streamObserver2 = streamObserver;
                scheduledExecutorService.scheduleAtFixedRate(() -> {
                    ConsumerImpl.this.grpcStub.sendConsumerHeartbeat(consumerHeartbeatRequest, streamObserver2);
                }, 0L, 1L, TimeUnit.SECONDS);
                ConsumerImpl.this.notifyStarted();
            }

            public void onError(Throwable th) {
                ConsumerImpl.logger.error("consumer {} attach to subscription {} error: {}", new Object[]{ConsumerImpl.this.consumerName, ConsumerImpl.this.subscriptionId, th});
                ConsumerImpl.this.notifyFailed(new HStreamDBClientException.SubscribeException("consumer subscribe error", th));
            }

            public void onCompleted() {
            }
        });
    }

    public void doStop() {
        logger.info("prepare to stop consumer");
        this.scheduledExecutorService.shutdownNow();
        this.executorService.shutdownNow();
        notifyStopped();
        logger.info("consumer has been stopped");
    }

    private static ReceivedRawRecord toReceivedRawRecord(ReceivedRecord receivedRecord) {
        try {
            return new ReceivedRawRecord(receivedRecord.getRecordId(), RecordUtils.parseRawRecordFromHStreamRecord(HStreamRecord.parseFrom(receivedRecord.getRecord())));
        } catch (InvalidProtocolBufferException e) {
            throw new HStreamDBClientException.InvalidRecordException("parse HStreamRecord error", e);
        }
    }

    private static ReceivedHRecord toReceivedHRecord(ReceivedRecord receivedRecord) {
        try {
            return new ReceivedHRecord(receivedRecord.getRecordId(), RecordUtils.parseHRecordFromHStreamRecord(HStreamRecord.parseFrom(receivedRecord.getRecord())));
        } catch (InvalidProtocolBufferException e) {
            throw new HStreamDBClientException.InvalidRecordException("parse HStreamRecord error", e);
        }
    }
}
