package io.hstream.impl;

import com.google.common.util.concurrent.AbstractService;
import com.google.protobuf.InvalidProtocolBufferException;
import io.hstream.HStreamDBClientException;
import io.hstream.ReceivedRecord;
import io.hstream.Record;
import io.hstream.RecordHeader;
import io.hstream.StreamShardOffset;
import io.hstream.StreamShardReader;
import io.hstream.StreamShardReaderBatchReceiver;
import io.hstream.StreamShardReaderReceiver;
import io.hstream.internal.HStreamApiGrpcKt;
import io.hstream.internal.LookupShardReaderRequest;
import io.hstream.internal.LookupShardReaderResponse;
import io.hstream.internal.ReadShardStreamRequest;
import io.hstream.internal.ReadShardStreamResponse;
import io.hstream.util.GrpcUtils;
import io.hstream.util.RecordUtils;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import kotlin.Metadata;
import kotlin.coroutines.CoroutineContext;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.streams.jdk8.StreamsKt;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.CoroutineScopeKt;
import kotlinx.coroutines.CoroutineStart;
import kotlinx.coroutines.Dispatchers;
import kotlinx.coroutines.flow.Flow;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* compiled from: StreamShardReaderKtImpl.kt */
@Metadata(mv = {1, 6, 0}, k = 1, xi = 48, d1 = {"��R\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n��\n\u0002\u0010\t\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\u0018�� \u001e2\u00020\u00012\u00020\u0002:\u0001\u001eBK\u0012\u0006\u0010\u0003\u001a\u00020\u0004\u0012\u0006\u0010\u0005\u001a\u00020\u0006\u0012\u0006\u0010\u0007\u001a\u00020\b\u0012\u0006\u0010\t\u001a\u00020\n\u0012\u0006\u0010\u000b\u001a\u00020\b\u0012\b\u0010\f\u001a\u0004\u0018\u00010\n\u0012\b\u0010\r\u001a\u0004\u0018\u00010\u000e\u0012\b\u0010\u000f\u001a\u0004\u0018\u00010\u0010¢\u0006\u0002\u0010\u0011J\b\u0010\u0018\u001a\u00020\u0019H\u0014J\b\u0010\u001a\u001a\u00020\u0019H\u0014J\u0010\u0010\u001b\u001a\u00020\u00192\u0006\u0010\u001c\u001a\u00020\u001dH\u0002R\u0010\u0010\u000f\u001a\u0004\u0018\u00010\u0010X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082\u0004¢\u0006\u0002\n��R\u0016\u0010\u0012\u001a\n \u0014*\u0004\u0018\u00010\u00130\u0013X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\t\u001a\u00020\nX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000b\u001a\u00020\bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0015\u001a\u00020\u0006X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0016\u001a\u00020\u0017X\u0082\u0004¢\u0006\u0002\n��R\u0010\u0010\r\u001a\u0004\u0018\u00010\u000eX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0007\u001a\u00020\bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0005\u001a\u00020\u0006X\u0082\u0004¢\u0006\u0002\n��R\u0010\u0010\f\u001a\u0004\u0018\u00010\nX\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0003"}, d2 = {"Lio/hstream/impl/StreamShardReaderKtImpl;", "Lcom/google/common/util/concurrent/AbstractService;", "Lio/hstream/StreamShardReader;", "client", "Lio/hstream/impl/HStreamClientKtImpl;", "streamName", DefaultSettings.DEFAULT_PARTITION_KEY, "shardId", DefaultSettings.DEFAULT_PARTITION_KEY, "from", "Lio/hstream/StreamShardOffset;", "maxReadBatches", "until", "receiver", "Lio/hstream/StreamShardReaderReceiver;", "batchReceiver", "Lio/hstream/StreamShardReaderBatchReceiver;", "(Lio/hstream/impl/HStreamClientKtImpl;Ljava/lang/String;JLio/hstream/StreamShardOffset;JLio/hstream/StreamShardOffset;Lio/hstream/StreamShardReaderReceiver;Lio/hstream/StreamShardReaderBatchReceiver;)V", "executorService", "Ljava/util/concurrent/ExecutorService;", "kotlin.jvm.PlatformType", "readerName", "readerScope", "Lkotlinx/coroutines/CoroutineScope;", "doStart", DefaultSettings.DEFAULT_PARTITION_KEY, "doStop", "process", "value", "Lio/hstream/internal/ReadShardStreamResponse;", "Companion"})
/* loaded from: input_file:io/hstream/impl/StreamShardReaderKtImpl.class */
public final class StreamShardReaderKtImpl extends AbstractService implements StreamShardReader {

    @NotNull
    private final HStreamClientKtImpl client;

    @NotNull
    private final String streamName;
    private final long shardId;

    @NotNull
    private final StreamShardOffset from;
    private final long maxReadBatches;

    @Nullable
    private final StreamShardOffset until;

    @Nullable
    private final StreamShardReaderReceiver receiver;

    @Nullable
    private final StreamShardReaderBatchReceiver batchReceiver;

    @NotNull
    private final CoroutineScope readerScope;

    @NotNull
    private final String readerName;
    private final ExecutorService executorService;

    @NotNull
    public static final Companion Companion = new Companion(null);
    private static final Logger logger = LoggerFactory.getLogger(StreamShardReaderKtImpl.class);

    /* compiled from: StreamShardReaderKtImpl.kt */
    @Metadata(mv = {1, 6, 0}, k = 1, xi = 48, d1 = {"��&\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002J\u0018\u0010\u0006\u001a\u00020\u00072\u0006\u0010\b\u001a\u00020\t2\u0006\u0010\n\u001a\u00020\u000bH\u0002R\u0016\u0010\u0003\u001a\n \u0005*\u0004\u0018\u00010\u00040\u0004X\u0082\u0004¢\u0006\u0002\n��¨\u0006\f"}, d2 = {"Lio/hstream/impl/StreamShardReaderKtImpl$Companion;", DefaultSettings.DEFAULT_PARTITION_KEY, "()V", "logger", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "toReceivedRecord", "Lio/hstream/ReceivedRecord;", "receivedHStreamRecord", "Lio/hstream/impl/ReceivedHStreamRecord;", "createdTime", "Ljava/time/Instant;", "client"})
    /* loaded from: input_file:io/hstream/impl/StreamShardReaderKtImpl$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public final ReceivedRecord toReceivedRecord(ReceivedHStreamRecord receivedHStreamRecord, Instant instant) {
            ReceivedRecord receivedRecord;
            try {
                RecordHeader parseRecordHeaderFromHStreamRecord = RecordUtils.parseRecordHeaderFromHStreamRecord(receivedHStreamRecord.getRecord());
                if (RecordUtils.isRawRecord(receivedHStreamRecord.getRecord())) {
                    receivedRecord = new ReceivedRecord(GrpcUtils.recordIdFromGrpc(receivedHStreamRecord.getRecordId()), Record.newBuilder().partitionKey(parseRecordHeaderFromHStreamRecord.getPartitionKey()).rawRecord(RecordUtils.parseRawRecordFromHStreamRecord(receivedHStreamRecord.getRecord())).build(), instant);
                } else {
                    receivedRecord = new ReceivedRecord(GrpcUtils.recordIdFromGrpc(receivedHStreamRecord.getRecordId()), Record.newBuilder().partitionKey(parseRecordHeaderFromHStreamRecord.getPartitionKey()).hRecord(RecordUtils.parseHRecordFromHStreamRecord(receivedHStreamRecord.getRecord())).build(), instant);
                }
                return receivedRecord;
            } catch (InvalidProtocolBufferException e) {
                throw new HStreamDBClientException.InvalidRecordException("parse HStreamRecord error", e);
            }
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    public StreamShardReaderKtImpl(@NotNull HStreamClientKtImpl hStreamClientKtImpl, @NotNull String str, long j, @NotNull StreamShardOffset streamShardOffset, long j2, @Nullable StreamShardOffset streamShardOffset2, @Nullable StreamShardReaderReceiver streamShardReaderReceiver, @Nullable StreamShardReaderBatchReceiver streamShardReaderBatchReceiver) {
        Intrinsics.checkNotNullParameter(hStreamClientKtImpl, "client");
        Intrinsics.checkNotNullParameter(str, "streamName");
        Intrinsics.checkNotNullParameter(streamShardOffset, "from");
        this.client = hStreamClientKtImpl;
        this.streamName = str;
        this.shardId = j;
        this.from = streamShardOffset;
        this.maxReadBatches = j2;
        this.until = streamShardOffset2;
        this.receiver = streamShardReaderReceiver;
        this.batchReceiver = streamShardReaderBatchReceiver;
        this.readerScope = CoroutineScopeKt.CoroutineScope(Dispatchers.getIO());
        String uuid = UUID.randomUUID().toString();
        Intrinsics.checkNotNullExpressionValue(uuid, "randomUUID().toString()");
        this.readerName = uuid;
        this.executorService = Executors.newSingleThreadExecutor();
    }

    protected void doStart() {
        new Thread(() -> {
            m21doStart$lambda0(r2);
        }).start();
    }

    protected void doStop() {
        new Thread(() -> {
            m22doStop$lambda1(r2);
        }).start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void process(ReadShardStreamResponse readShardStreamResponse) {
        if (isRunning()) {
            for (io.hstream.internal.ReceivedRecord receivedRecord : readShardStreamResponse.getReceivedRecordsList()) {
                List<ReceivedHStreamRecord> decompress = RecordUtils.decompress(receivedRecord);
                Instant ofEpochSecond = Instant.ofEpochSecond(receivedRecord.getRecord().getPublishTime().getSeconds(), r0.getNanos());
                if (this.batchReceiver != null) {
                    this.executorService.submit(() -> {
                        m24process$lambda3(r1, r2, r3);
                    });
                } else {
                    for (ReceivedHStreamRecord receivedHStreamRecord : decompress) {
                        this.executorService.submit(() -> {
                            m25process$lambda4(r1, r2, r3);
                        });
                    }
                }
            }
        }
    }

    /* renamed from: doStart$lambda-0, reason: not valid java name */
    private static final void m21doStart$lambda0(StreamShardReaderKtImpl streamShardReaderKtImpl) {
        Intrinsics.checkNotNullParameter(streamShardReaderKtImpl, "this$0");
        try {
            logger.info("streamShardReader " + streamShardReaderKtImpl.readerName + " is starting");
            LookupShardReaderResponse lookupShardReaderResponse = (LookupShardReaderResponse) streamShardReaderKtImpl.client.unaryCallBlocked(new StreamShardReaderKtImpl$doStart$1$lookupShardReaderResp$1(LookupShardReaderRequest.newBuilder().setReaderId(streamShardReaderKtImpl.readerName).m2204build(), null));
            String str = lookupShardReaderResponse.getServerNode().getHost() + ":" + lookupShardReaderResponse.getServerNode().getPort();
            ReadShardStreamRequest.Builder maxReadBatches = ReadShardStreamRequest.newBuilder().setReaderId(streamShardReaderKtImpl.readerName).setShardId(streamShardReaderKtImpl.shardId).setFrom(GrpcUtils.streamShardOffsetToGrpc(streamShardReaderKtImpl.from)).setMaxReadBatches(streamShardReaderKtImpl.maxReadBatches);
            if (streamShardReaderKtImpl.until != null) {
                maxReadBatches.setUntil(GrpcUtils.streamShardOffsetToGrpc(streamShardReaderKtImpl.until));
            }
            HStreamApiGrpcKt.HStreamApiCoroutineStub coroutineStub = streamShardReaderKtImpl.client.getCoroutineStub(str);
            ReadShardStreamRequest m2724build = maxReadBatches.m2724build();
            Intrinsics.checkNotNullExpressionValue(m2724build, "readerBuilder.build()");
            Flow readShardStream$default = HStreamApiGrpcKt.HStreamApiCoroutineStub.readShardStream$default(coroutineStub, m2724build, null, 2, null);
            streamShardReaderKtImpl.notifyStarted();
            BuildersKt.launch$default(streamShardReaderKtImpl.readerScope, (CoroutineContext) null, (CoroutineStart) null, new StreamShardReaderKtImpl$doStart$1$1(readShardStream$default, streamShardReaderKtImpl, null), 3, (Object) null);
        } catch (Exception e) {
            logger.error("steamShardReader " + streamShardReaderKtImpl.readerName + " failed to start", e);
            streamShardReaderKtImpl.notifyFailed(new HStreamDBClientException(e));
        }
    }

    /* renamed from: doStop$lambda-1, reason: not valid java name */
    private static final void m22doStop$lambda1(StreamShardReaderKtImpl streamShardReaderKtImpl) {
        Intrinsics.checkNotNullParameter(streamShardReaderKtImpl, "this$0");
        streamShardReaderKtImpl.executorService.shutdownNow();
        streamShardReaderKtImpl.notifyStopped();
    }

    /* renamed from: process$lambda-3$lambda-2, reason: not valid java name */
    private static final ReceivedRecord m23process$lambda3$lambda2(Instant instant, ReceivedHStreamRecord receivedHStreamRecord) {
        Companion companion = Companion;
        Intrinsics.checkNotNullExpressionValue(receivedHStreamRecord, "it");
        Intrinsics.checkNotNullExpressionValue(instant, "createdTime");
        return companion.toReceivedRecord(receivedHStreamRecord, instant);
    }

    /* renamed from: process$lambda-3, reason: not valid java name */
    private static final void m24process$lambda3(StreamShardReaderKtImpl streamShardReaderKtImpl, List list, Instant instant) {
        Intrinsics.checkNotNullParameter(streamShardReaderKtImpl, "this$0");
        if (streamShardReaderKtImpl.isRunning()) {
            try {
                Stream map = list.stream().map((v1) -> {
                    return m23process$lambda3$lambda2(r1, v1);
                });
                Intrinsics.checkNotNullExpressionValue(map, "receivedHStreamRecords.s…Record(it, createdTime) }");
                streamShardReaderKtImpl.batchReceiver.process(StreamsKt.toList(map));
            } catch (Exception e) {
                streamShardReaderKtImpl.notifyFailed(e);
            }
        }
    }

    /* renamed from: process$lambda-4, reason: not valid java name */
    private static final void m25process$lambda4(StreamShardReaderKtImpl streamShardReaderKtImpl, ReceivedHStreamRecord receivedHStreamRecord, Instant instant) {
        Intrinsics.checkNotNullParameter(streamShardReaderKtImpl, "this$0");
        if (streamShardReaderKtImpl.isRunning()) {
            try {
                StreamShardReaderReceiver streamShardReaderReceiver = streamShardReaderKtImpl.receiver;
                Intrinsics.checkNotNull(streamShardReaderReceiver);
                Companion companion = Companion;
                Intrinsics.checkNotNullExpressionValue(receivedHStreamRecord, "receivedHStreamRecord");
                Intrinsics.checkNotNullExpressionValue(instant, "createdTime");
                streamShardReaderReceiver.process(companion.toReceivedRecord(receivedHStreamRecord, instant));
            } catch (Exception e) {
                streamShardReaderKtImpl.notifyFailed(e);
            }
        }
    }
}
