package io.hstream.impl;

import io.hstream.Reader;
import io.hstream.Record;
import io.hstream.StreamShardOffset;
import io.hstream.internal.CreateShardReaderRequest;
import io.hstream.internal.CreateShardReaderResponse;
import io.hstream.internal.DeleteShardReaderRequest;
import io.hstream.internal.HStreamApiGrpcKt;
import io.hstream.internal.LookupShardReaderRequest;
import io.hstream.internal.LookupShardReaderResponse;
import io.hstream.internal.Shard;
import io.hstream.util.GrpcUtils;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import kotlin.Metadata;
import kotlin.ResultKt;
import kotlin.Unit;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlin.coroutines.jvm.internal.DebugMetadata;
import kotlin.coroutines.jvm.internal.SuspendLambda;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.CoroutineScopeKt;
import kotlinx.coroutines.CoroutineStart;
import kotlinx.coroutines.Dispatchers;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* compiled from: ReaderKtImpl.kt */
@Metadata(mv = {1, Shard.ISACTIVE_FIELD_NUMBER, 0}, k = 1, xi = 48, d1 = {"��@\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n��\n\u0002\u0010\t\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\b\n\u0002\b\u0004\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010!\n\u0002\u0018\u0002\n\u0002\b\u0002\u0018�� \u00162\u00020\u0001:\u0001\u0016B5\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t\u0012\u0006\u0010\n\u001a\u00020\u000b\u0012\u0006\u0010\f\u001a\u00020\u0005¢\u0006\u0002\u0010\rJ\b\u0010\u000f\u001a\u00020\u0010H\u0016J\u001c\u0010\u0011\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00140\u00130\u00122\u0006\u0010\u0015\u001a\u00020\u000bH\u0016R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\f\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000e\u001a\u00020\u0005X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\n\u001a\u00020\u000bX\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0002"}, d2 = {"Lio/hstream/impl/ReaderKtImpl;", "Lio/hstream/Reader;", "client", "Lio/hstream/impl/HStreamClientKtImpl;", "streamName", DefaultSettings.DEFAULT_PARTITION_KEY, "shardId", DefaultSettings.DEFAULT_PARTITION_KEY, "shardOffset", "Lio/hstream/StreamShardOffset;", "timeoutMs", DefaultSettings.DEFAULT_PARTITION_KEY, "readerId", "(Lio/hstream/impl/HStreamClientKtImpl;Ljava/lang/String;JLio/hstream/StreamShardOffset;ILjava/lang/String;)V", "serverUrl", "close", DefaultSettings.DEFAULT_PARTITION_KEY, "read", "Ljava/util/concurrent/CompletableFuture;", DefaultSettings.DEFAULT_PARTITION_KEY, "Lio/hstream/Record;", "maxRecords", "Companion"})
/* loaded from: input_file:io/hstream/impl/ReaderKtImpl.class */
public final class ReaderKtImpl implements Reader {

    @NotNull
    private final HStreamClientKtImpl client;

    @NotNull
    private final String streamName;
    private final long shardId;

    @NotNull
    private final StreamShardOffset shardOffset;
    private final int timeoutMs;

    @NotNull
    private final String readerId;

    @NotNull
    private String serverUrl;

    @NotNull
    public static final Companion Companion = new Companion(null);
    private static final Logger logger = LoggerFactory.getLogger(ReaderKtImpl.class);

    @NotNull
    private static final CoroutineScope readerScope = CoroutineScopeKt.CoroutineScope(Dispatchers.getDefault());

    /* compiled from: ReaderKtImpl.kt */
    @Metadata(mv = {1, Shard.ISACTIVE_FIELD_NUMBER, 0}, k = 3, xi = 48, d1 = {"��\f\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\u0010��\u001a\u00020\u00012\u0006\u0010\u0002\u001a\u00020\u0003H\u008a@"}, d2 = {"<anonymous>", "Lio/hstream/internal/CreateShardReaderResponse;", "it", "Lio/hstream/internal/HStreamApiGrpcKt$HStreamApiCoroutineStub;"})
    @DebugMetadata(f = "ReaderKtImpl.kt", l = {38}, i = {}, s = {}, n = {}, m = "invokeSuspend", c = "io.hstream.impl.ReaderKtImpl$1")
    /* renamed from: io.hstream.impl.ReaderKtImpl$1, reason: invalid class name */
    /* loaded from: input_file:io/hstream/impl/ReaderKtImpl$1.class */
    static final class AnonymousClass1 extends SuspendLambda implements Function2<HStreamApiGrpcKt.HStreamApiCoroutineStub, Continuation<? super CreateShardReaderResponse>, Object> {
        int label;
        /* synthetic */ Object L$0;
        final /* synthetic */ CreateShardReaderRequest $createShardReaderRequest;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        AnonymousClass1(CreateShardReaderRequest createShardReaderRequest, Continuation<? super AnonymousClass1> continuation) {
            super(2, continuation);
            this.$createShardReaderRequest = createShardReaderRequest;
        }

        @Nullable
        public final Object invokeSuspend(@NotNull Object obj) {
            Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch (this.label) {
                case 0:
                    ResultKt.throwOnFailure(obj);
                    HStreamApiGrpcKt.HStreamApiCoroutineStub hStreamApiCoroutineStub = (HStreamApiGrpcKt.HStreamApiCoroutineStub) this.L$0;
                    CreateShardReaderRequest createShardReaderRequest = this.$createShardReaderRequest;
                    Intrinsics.checkNotNullExpressionValue(createShardReaderRequest, "createShardReaderRequest");
                    this.label = 1;
                    Object createShardReader$default = HStreamApiGrpcKt.HStreamApiCoroutineStub.createShardReader$default(hStreamApiCoroutineStub, createShardReaderRequest, null, (Continuation) this, 2, null);
                    return createShardReader$default == coroutine_suspended ? coroutine_suspended : createShardReader$default;
                case 1:
                    ResultKt.throwOnFailure(obj);
                    return obj;
                default:
                    throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
        }

        @NotNull
        public final Continuation<Unit> create(@Nullable Object obj, @NotNull Continuation<?> continuation) {
            Continuation<Unit> anonymousClass1 = new AnonymousClass1(this.$createShardReaderRequest, continuation);
            anonymousClass1.L$0 = obj;
            return anonymousClass1;
        }

        @Nullable
        public final Object invoke(@NotNull HStreamApiGrpcKt.HStreamApiCoroutineStub hStreamApiCoroutineStub, @Nullable Continuation<? super CreateShardReaderResponse> continuation) {
            return create(hStreamApiCoroutineStub, continuation).invokeSuspend(Unit.INSTANCE);
        }
    }

    /* compiled from: ReaderKtImpl.kt */
    @Metadata(mv = {1, Shard.ISACTIVE_FIELD_NUMBER, 0}, k = 1, xi = 48, d1 = {"��\u001a\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u0016\u0010\u0003\u001a\n \u0005*\u0004\u0018\u00010\u00040\u0004X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��¨\u0006\b"}, d2 = {"Lio/hstream/impl/ReaderKtImpl$Companion;", DefaultSettings.DEFAULT_PARTITION_KEY, "()V", "logger", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "readerScope", "Lkotlinx/coroutines/CoroutineScope;", "client"})
    /* loaded from: input_file:io/hstream/impl/ReaderKtImpl$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    public ReaderKtImpl(@NotNull HStreamClientKtImpl hStreamClientKtImpl, @NotNull String str, long j, @NotNull StreamShardOffset streamShardOffset, int i, @NotNull String str2) {
        Intrinsics.checkNotNullParameter(hStreamClientKtImpl, "client");
        Intrinsics.checkNotNullParameter(str, "streamName");
        Intrinsics.checkNotNullParameter(streamShardOffset, "shardOffset");
        Intrinsics.checkNotNullParameter(str2, "readerId");
        this.client = hStreamClientKtImpl;
        this.streamName = str;
        this.shardId = j;
        this.shardOffset = streamShardOffset;
        this.timeoutMs = i;
        this.readerId = str2;
        this.client.unaryCallBlocked(new AnonymousClass1(CreateShardReaderRequest.newBuilder().setStreamName(this.streamName).setShardId(this.shardId).setShardOffset(GrpcUtils.streamShardOffsetToGrpc(this.shardOffset)).setTimeout(this.timeoutMs).setReaderId(this.readerId).m760build(), null));
        LookupShardReaderResponse lookupShardReaderResponse = (LookupShardReaderResponse) this.client.unaryCallBlocked(new ReaderKtImpl$lookupShardReaderResp$1(LookupShardReaderRequest.newBuilder().setReaderId(this.readerId).m2417build(), null));
        this.serverUrl = lookupShardReaderResponse.getServerNode().getHost() + ":" + lookupShardReaderResponse.getServerNode().getPort();
        logger.info("created Reader [{}] for stream [{}] shard [{}]", new Object[]{this.readerId, this.streamName, Long.valueOf(this.shardId)});
    }

    @Override // io.hstream.Reader
    @NotNull
    public CompletableFuture<List<Record>> read(int i) {
        CompletableFuture<List<Record>> completableFuture = new CompletableFuture<>();
        BuildersKt.launch$default(readerScope, (CoroutineContext) null, (CoroutineStart) null, new ReaderKtImpl$read$1(this, i, completableFuture, null), 3, (Object) null);
        return completableFuture;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        BuildersKt.runBlocking$default((CoroutineContext) null, new ReaderKtImpl$close$1(this, DeleteShardReaderRequest.newBuilder().setReaderId(this.readerId).m995build(), null), 1, (Object) null);
        logger.info("Reader [{}] closed", this.readerId);
    }
}
