package io.hstream.impl;

import com.google.protobuf.InvalidProtocolBufferException;
import io.hstream.HStreamDBClientException;
import io.hstream.ReceivedRecord;
import io.hstream.Record;
import io.hstream.RecordHeader;
import io.hstream.StreamKeyReader;
import io.hstream.StreamShardOffset;
import io.hstream.internal.HStreamApiGrpcKt;
import io.hstream.internal.HStreamRecord;
import io.hstream.internal.LookupKeyRequest;
import io.hstream.internal.ReadStreamByKeyRequest;
import io.hstream.internal.ReadStreamByKeyResponse;
import io.hstream.internal.RecordId;
import io.hstream.internal.ServerNode;
import io.hstream.util.GrpcUtils;
import io.hstream.util.RecordUtils;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import kotlin.Metadata;
import kotlin.coroutines.CoroutineContext;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.CoroutineScopeKt;
import kotlinx.coroutines.CoroutineStart;
import kotlinx.coroutines.Dispatchers;
import kotlinx.coroutines.channels.BufferOverflow;
import kotlinx.coroutines.flow.MutableSharedFlow;
import kotlinx.coroutines.flow.SharedFlowKt;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* compiled from: StreamKeyReaderKtImpl.kt */
@Metadata(mv = {1, 6, 0}, k = 1, xi = 48, d1 = {"��v\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\b\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0010\u000b\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\u0018�� (2\u00020\u0001:\u0001(B7\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0005\u0012\u0006\u0010\u0007\u001a\u00020\b\u0012\b\u0010\t\u001a\u0004\u0018\u00010\b\u0012\u0006\u0010\n\u001a\u00020\u000b¢\u0006\u0002\u0010\fJ\b\u0010\u001f\u001a\u00020 H\u0016J\b\u0010!\u001a\u00020 H\u0002J\t\u0010\"\u001a\u00020#H\u0096\u0002J\u000b\u0010$\u001a\u0004\u0018\u00010\u000fH\u0096\u0002J\u0010\u0010%\u001a\u00020 2\u0006\u0010&\u001a\u00020'H\u0002R\u0014\u0010\r\u001a\b\u0012\u0004\u0012\u00020\u000f0\u000eX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\n\u001a\u00020\u000bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u0018\u0010\u0010\u001a\f\u0012\b\u0012\u00060\u0012j\u0002`\u00130\u0011X\u0082\u0004¢\u0006\u0002\n��R\u0016\u0010\u0014\u001a\n \u0016*\u0004\u0018\u00010\u00150\u0015X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0007\u001a\u00020\bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0017\u001a\u00020\u0018X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0019\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u001a\u001a\u00020\u001bX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u001c\u001a\b\u0012\u0004\u0012\u00020\u001e0\u001dX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u0010\u0010\t\u001a\u0004\u0018\u00010\bX\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0002"}, d2 = {"Lio/hstream/impl/StreamKeyReaderKtImpl;", "Lio/hstream/StreamKeyReader;", "client", "Lio/hstream/impl/HStreamClientKtImpl;", "streamName", DefaultSettings.DEFAULT_PARTITION_KEY, "key", "from", "Lio/hstream/StreamShardOffset;", "until", "bufferSize", DefaultSettings.DEFAULT_PARTITION_KEY, "(Lio/hstream/impl/HStreamClientKtImpl;Ljava/lang/String;Ljava/lang/String;Lio/hstream/StreamShardOffset;Lio/hstream/StreamShardOffset;I)V", "buffer", "Ljava/util/concurrent/ArrayBlockingQueue;", "Lio/hstream/ReceivedRecord;", "exceptionRef", "Ljava/util/concurrent/atomic/AtomicReference;", "Ljava/lang/Exception;", "Lkotlin/Exception;", "executorService", "Ljava/util/concurrent/ExecutorService;", "kotlin.jvm.PlatformType", "isStopped", "Ljava/util/concurrent/atomic/AtomicBoolean;", "readerName", "readerScope", "Lkotlinx/coroutines/CoroutineScope;", "requestFlow", "Lkotlinx/coroutines/flow/MutableSharedFlow;", "Lio/hstream/internal/ReadStreamByKeyRequest;", "close", DefaultSettings.DEFAULT_PARTITION_KEY, "doStart", "hasNext", DefaultSettings.DEFAULT_PARTITION_KEY, "next", "saveToBuffer", "value", "Lio/hstream/internal/ReadStreamByKeyResponse;", "Companion"})
/* loaded from: input_file:io/hstream/impl/StreamKeyReaderKtImpl.class */
public final class StreamKeyReaderKtImpl implements StreamKeyReader {

    @NotNull
    private final HStreamClientKtImpl client;

    @NotNull
    private final String streamName;

    @NotNull
    private final String key;

    @NotNull
    private final StreamShardOffset from;

    @Nullable
    private final StreamShardOffset until;
    private final int bufferSize;

    @NotNull
    private final CoroutineScope readerScope;

    @NotNull
    private final String readerName;
    private final ExecutorService executorService;

    @NotNull
    private final MutableSharedFlow<ReadStreamByKeyRequest> requestFlow;

    @NotNull
    private final ArrayBlockingQueue<ReceivedRecord> buffer;

    @NotNull
    private final AtomicReference<Exception> exceptionRef;

    @NotNull
    private final AtomicBoolean isStopped;

    @NotNull
    public static final Companion Companion = new Companion(null);
    private static final Logger logger = LoggerFactory.getLogger(StreamKeyReaderKtImpl.class);

    /* compiled from: StreamKeyReaderKtImpl.kt */
    @Metadata(mv = {1, 6, 0}, k = 1, xi = 48, d1 = {"��,\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002J \u0010\u0006\u001a\u00020\u00072\u0006\u0010\b\u001a\u00020\t2\u0006\u0010\n\u001a\u00020\u000b2\u0006\u0010\f\u001a\u00020\rH\u0002R\u0016\u0010\u0003\u001a\n \u0005*\u0004\u0018\u00010\u00040\u0004X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u000e"}, d2 = {"Lio/hstream/impl/StreamKeyReaderKtImpl$Companion;", DefaultSettings.DEFAULT_PARTITION_KEY, "()V", "logger", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "toReceivedRecord", "Lio/hstream/ReceivedRecord;", "hStreamRecord", "Lio/hstream/internal/HStreamRecord;", "recordId", "Lio/hstream/internal/RecordId;", "createdTime", "Ljava/time/Instant;", "client"})
    /* loaded from: input_file:io/hstream/impl/StreamKeyReaderKtImpl$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public final ReceivedRecord toReceivedRecord(HStreamRecord hStreamRecord, RecordId recordId, Instant instant) {
            ReceivedRecord receivedRecord;
            try {
                RecordHeader parseRecordHeaderFromHStreamRecord = RecordUtils.parseRecordHeaderFromHStreamRecord(hStreamRecord);
                if (RecordUtils.isRawRecord(hStreamRecord)) {
                    receivedRecord = new ReceivedRecord(GrpcUtils.recordIdFromGrpc(recordId), Record.newBuilder().partitionKey(parseRecordHeaderFromHStreamRecord.getPartitionKey()).rawRecord(RecordUtils.parseRawRecordFromHStreamRecord(hStreamRecord)).build(), instant);
                } else {
                    receivedRecord = new ReceivedRecord(GrpcUtils.recordIdFromGrpc(recordId), Record.newBuilder().partitionKey(parseRecordHeaderFromHStreamRecord.getPartitionKey()).hRecord(RecordUtils.parseHRecordFromHStreamRecord(hStreamRecord)).build(), instant);
                }
                return receivedRecord;
            } catch (InvalidProtocolBufferException e) {
                throw new HStreamDBClientException.InvalidRecordException("parse HStreamRecord error", e);
            }
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    public StreamKeyReaderKtImpl(@NotNull HStreamClientKtImpl hStreamClientKtImpl, @NotNull String str, @NotNull String str2, @NotNull StreamShardOffset streamShardOffset, @Nullable StreamShardOffset streamShardOffset2, int i) {
        Intrinsics.checkNotNullParameter(hStreamClientKtImpl, "client");
        Intrinsics.checkNotNullParameter(str, "streamName");
        Intrinsics.checkNotNullParameter(str2, "key");
        Intrinsics.checkNotNullParameter(streamShardOffset, "from");
        this.client = hStreamClientKtImpl;
        this.streamName = str;
        this.key = str2;
        this.from = streamShardOffset;
        this.until = streamShardOffset2;
        this.bufferSize = i;
        this.readerScope = CoroutineScopeKt.CoroutineScope(Dispatchers.getIO());
        String uuid = UUID.randomUUID().toString();
        Intrinsics.checkNotNullExpressionValue(uuid, "randomUUID().toString()");
        this.readerName = uuid;
        this.executorService = Executors.newSingleThreadExecutor();
        this.requestFlow = SharedFlowKt.MutableSharedFlow$default(0, 0, (BufferOverflow) null, 7, (Object) null);
        this.buffer = new ArrayBlockingQueue<>(this.bufferSize);
        this.exceptionRef = new AtomicReference<>(null);
        this.isStopped = new AtomicBoolean(false);
        doStart();
    }

    private final void doStart() {
        logger.info("streamKeyReader " + this.readerName + " is starting");
        ServerNode serverNode = (ServerNode) this.client.unaryCallBlocked(new StreamKeyReaderKtImpl$doStart$lookupResp$1(LookupKeyRequest.newBuilder().setPartitionKey(this.key).m2176build(), null));
        String str = serverNode.getHost() + ":" + serverNode.getPort();
        ReadStreamByKeyRequest.Builder readRecordCount = ReadStreamByKeyRequest.newBuilder().setReaderId(this.readerName).setStreamName(this.streamName).setKey(this.key).setFrom(GrpcUtils.streamShardOffsetToGrpc(this.from)).setReadRecordCount(this.bufferSize);
        if (this.until != null) {
            readRecordCount.setUntil(GrpcUtils.streamShardOffsetToGrpc(this.until));
        }
        BuildersKt.launch$default(this.readerScope, (CoroutineContext) null, (CoroutineStart) null, new StreamKeyReaderKtImpl$doStart$1(this, readRecordCount, HStreamApiGrpcKt.HStreamApiCoroutineStub.readStreamByKey$default(this.client.getCoroutineStub(str), this.requestFlow, null, 2, null), null), 3, (Object) null);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        CoroutineScopeKt.cancel$default(this.readerScope, (CancellationException) null, 1, (Object) null);
        this.executorService.shutdownNow();
        logger.info("StreamKeyReader " + this.readerName + " closed");
    }

    @Override // io.hstream.StreamKeyReader
    public boolean hasNext() {
        return (this.isStopped.get() && this.buffer.isEmpty()) ? false : true;
    }

    @Override // io.hstream.StreamKeyReader
    @Nullable
    public ReceivedRecord next() {
        ReceivedRecord receivedRecord = null;
        while (receivedRecord == null) {
            receivedRecord = this.buffer.poll(100L, TimeUnit.MILLISECONDS);
            if (receivedRecord == null) {
                Exception exc = this.exceptionRef.get();
                if (exc != null) {
                    throw exc;
                }
                if (this.isStopped.get()) {
                    return null;
                }
            }
        }
        BuildersKt.launch$default(this.readerScope, (CoroutineContext) null, (CoroutineStart) null, new StreamKeyReaderKtImpl$next$1(this, null), 3, (Object) null);
        return receivedRecord;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void saveToBuffer(ReadStreamByKeyResponse readStreamByKeyResponse) {
        int i = 0;
        for (HStreamRecord hStreamRecord : readStreamByKeyResponse.getReceivedRecordsList()) {
            int i2 = i;
            i++;
            RecordId recordIds = readStreamByKeyResponse.getRecordIds(i2);
            this.executorService.submit(() -> {
                m21saveToBuffer$lambda0(r1, r2, r3);
            });
        }
    }

    /* renamed from: saveToBuffer$lambda-0, reason: not valid java name */
    private static final void m21saveToBuffer$lambda0(HStreamRecord hStreamRecord, RecordId recordId, StreamKeyReaderKtImpl streamKeyReaderKtImpl) {
        Intrinsics.checkNotNullParameter(streamKeyReaderKtImpl, "this$0");
        Companion companion = Companion;
        Intrinsics.checkNotNullExpressionValue(hStreamRecord, "receivedRecord");
        Intrinsics.checkNotNullExpressionValue(recordId, "recordId");
        Instant now = Instant.now();
        Intrinsics.checkNotNullExpressionValue(now, "now()");
        streamKeyReaderKtImpl.buffer.put(companion.toReceivedRecord(hStreamRecord, recordId, now));
    }
}
