package io.hstream.impl;

import com.google.common.hash.Hashing;
import io.hstream.CompressionType;
import io.hstream.HRecord;
import io.hstream.Producer;
import io.hstream.Record;
import io.hstream.internal.AppendRequest;
import io.hstream.internal.BatchedRecord;
import io.hstream.internal.HStreamRecord;
import io.hstream.internal.ListShardsRequest;
import io.hstream.internal.ListShardsResponse;
import io.hstream.internal.Shard;
import io.hstream.util.GrpcUtils;
import io.hstream.util.RecordUtils;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import kotlin.Metadata;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.CoroutineScopeKt;
import kotlinx.coroutines.CoroutineStart;
import kotlinx.coroutines.Dispatchers;
import kotlinx.coroutines.sync.Mutex;
import kotlinx.coroutines.sync.MutexKt;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* compiled from: ProducerKtImpl.kt */
@Metadata(mv = {1, 6, 0}, k = 1, xi = 48, d1 = {"��x\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n��\n\u0002\u0010\t\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\b\n��\n\u0002\u0010\u000b\n\u0002\b\b\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0012\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0004\b\u0016\u0018�� 22\u00020\u0001:\u00012B\u001d\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007¢\u0006\u0002\u0010\bJ9\u0010\u0013\u001a\b\u0012\u0004\u0012\u00020\u00050\u00112\u0006\u0010\u0014\u001a\u00020\u00152\u0006\u0010\u0016\u001a\u00020\u00072\u0006\u0010\u0017\u001a\u00020\u00182\b\b\u0002\u0010\u0019\u001a\u00020\u001aH\u0082@ø\u0001��¢\u0006\u0002\u0010\u001bJ\u0010\u0010\u001c\u001a\u00020\u00072\u0006\u0010\u001d\u001a\u00020\u0005H\u0004J#\u0010\u001e\u001a\u00020\u00052\u0006\u0010\u0016\u001a\u00020\u00072\b\b\u0002\u0010\u0019\u001a\u00020\u001aH\u0082@ø\u0001��¢\u0006\u0002\u0010\u001fJ\u0019\u0010 \u001a\u00020\u00052\u0006\u0010\u0016\u001a\u00020\u0007H\u0082@ø\u0001��¢\u0006\u0002\u0010!J\u0016\u0010\"\u001a\b\u0012\u0004\u0012\u00020\u00050#2\u0006\u0010$\u001a\u00020%H\u0016J\u0016\u0010\"\u001a\b\u0012\u0004\u0012\u00020\u00050#2\u0006\u0010&\u001a\u00020'H\u0016J\u0016\u0010\"\u001a\b\u0012\u0004\u0012\u00020\u00050#2\u0006\u0010(\u001a\u00020)H\u0016J7\u0010*\u001a\b\u0012\u0004\u0012\u00020\u00050\u00112\f\u0010+\u001a\b\u0012\u0004\u0012\u00020,0\u00112\u0006\u0010\u0016\u001a\u00020\u00072\b\b\u0002\u0010-\u001a\u00020.H\u0084@ø\u0001��¢\u0006\u0002\u0010/J\u0016\u00100\u001a\b\u0012\u0004\u0012\u00020\u00050#2\u0006\u00101\u001a\u00020,H\u0014R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u0011\u0010\u0006\u001a\u00020\u0007¢\u0006\b\n��\u001a\u0004\b\t\u0010\nR*\u0010\u000b\u001a\u001e\u0012\u0004\u0012\u00020\u0007\u0012\u0004\u0012\u00020\u00050\fj\u000e\u0012\u0004\u0012\u00020\u0007\u0012\u0004\u0012\u00020\u0005`\rX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000e\u001a\u00020\u000fX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0010\u001a\b\u0012\u0004\u0012\u00020\u00120\u0011X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006\u0002"}, d2 = {"Lio/hstream/impl/ProducerKtImpl;", "Lio/hstream/Producer;", "client", "Lio/hstream/impl/HStreamClientKtImpl;", "stream", DefaultSettings.DEFAULT_PARTITION_KEY, "requestTimeoutMs", DefaultSettings.DEFAULT_PARTITION_KEY, "(Lio/hstream/impl/HStreamClientKtImpl;Ljava/lang/String;J)V", "getRequestTimeoutMs", "()J", "serverUrls", "Ljava/util/HashMap;", "Lkotlin/collections/HashMap;", "serverUrlsLock", "Lkotlinx/coroutines/sync/Mutex;", "shards", DefaultSettings.DEFAULT_PARTITION_KEY, "Lio/hstream/internal/Shard;", "appendWithRetry", "appendRequest", "Lio/hstream/internal/AppendRequest;", "shardId", "tryTimes", DefaultSettings.DEFAULT_PARTITION_KEY, "forceUpdate", DefaultSettings.DEFAULT_PARTITION_KEY, "(Lio/hstream/internal/AppendRequest;JIZLkotlin/coroutines/Continuation;)Ljava/lang/Object;", "calculateShardIdByPartitionKey", "partitionKey", "lookupServerUrl", "(JZLkotlin/coroutines/Continuation;)Ljava/lang/Object;", "updateServerUrl", "(JLkotlin/coroutines/Continuation;)Ljava/lang/Object;", "write", "Ljava/util/concurrent/CompletableFuture;", "hRecord", "Lio/hstream/HRecord;", "record", "Lio/hstream/Record;", "rawRecord", DefaultSettings.DEFAULT_PARTITION_KEY, "writeHStreamRecords", "hStreamRecords", "Lio/hstream/internal/HStreamRecord;", "compressionType", "Lio/hstream/CompressionType;", "(Ljava/util/List;JLio/hstream/CompressionType;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "writeInternal", "hStreamRecord", "Companion"})
/* loaded from: input_file:io/hstream/impl/ProducerKtImpl.class */
public class ProducerKtImpl implements Producer {

    @NotNull
    private final HStreamClientKtImpl client;

    @NotNull
    private final String stream;
    private final long requestTimeoutMs;

    @NotNull
    private final HashMap<Long, String> serverUrls;

    @NotNull
    private final Mutex serverUrlsLock;

    @NotNull
    private final List<Shard> shards;

    @NotNull
    public static final Companion Companion = new Companion(null);
    private static final Logger logger = LoggerFactory.getLogger(ProducerKtImpl.class);

    @NotNull
    private static final CoroutineScope writeRecordScope = CoroutineScopeKt.CoroutineScope(Dispatchers.getDefault());

    /* compiled from: ProducerKtImpl.kt */
    @Metadata(mv = {1, 6, 0}, k = 1, xi = 48, d1 = {"��\u001a\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u0016\u0010\u0003\u001a\n \u0005*\u0004\u0018\u00010\u00040\u0004X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��¨\u0006\b"}, d2 = {"Lio/hstream/impl/ProducerKtImpl$Companion;", DefaultSettings.DEFAULT_PARTITION_KEY, "()V", "logger", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "writeRecordScope", "Lkotlinx/coroutines/CoroutineScope;", "client"})
    /* loaded from: input_file:io/hstream/impl/ProducerKtImpl$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    public ProducerKtImpl(@NotNull HStreamClientKtImpl hStreamClientKtImpl, @NotNull String str, long j) {
        Intrinsics.checkNotNullParameter(hStreamClientKtImpl, "client");
        Intrinsics.checkNotNullParameter(str, "stream");
        this.client = hStreamClientKtImpl;
        this.stream = str;
        this.requestTimeoutMs = j;
        this.serverUrls = new HashMap<>();
        this.serverUrlsLock = MutexKt.Mutex$default(false, 1, (Object) null);
        List<Shard> shardsList = ((ListShardsResponse) this.client.unaryCallBlocked(new ProducerKtImpl$listShardResponse$1(ListShardsRequest.newBuilder().setStreamName(this.stream).m1876build(), null))).getShardsList();
        Intrinsics.checkNotNullExpressionValue(shardsList, "listShardResponse.shardsList");
        this.shards = shardsList;
    }

    public final long getRequestTimeoutMs() {
        return this.requestTimeoutMs;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Removed duplicated region for block: B:25:0x0123  */
    /* JADX WARN: Removed duplicated region for block: B:27:0x0126  */
    /* JADX WARN: Removed duplicated region for block: B:37:0x0083  */
    /* JADX WARN: Removed duplicated region for block: B:39:0x00c5  */
    /* JADX WARN: Removed duplicated region for block: B:40:0x0148  */
    /* JADX WARN: Removed duplicated region for block: B:42:0x0150  */
    /* JADX WARN: Removed duplicated region for block: B:8:0x0064  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public final java.lang.Object lookupServerUrl(long r8, boolean r10, kotlin.coroutines.Continuation<? super java.lang.String> r11) {
        /*
            Method dump skipped, instructions count: 346
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.hstream.impl.ProducerKtImpl.lookupServerUrl(long, boolean, kotlin.coroutines.Continuation):java.lang.Object");
    }

    static /* synthetic */ Object lookupServerUrl$default(ProducerKtImpl producerKtImpl, long j, boolean z, Continuation continuation, int i, Object obj) {
        if (obj != null) {
            throw new UnsupportedOperationException("Super calls with default arguments not supported in this target, function: lookupServerUrl");
        }
        if ((i & 2) != 0) {
            z = false;
        }
        return producerKtImpl.lookupServerUrl(j, z, continuation);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Failed to find 'out' block for switch in B:7:0x0043. Please report as an issue. */
    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Removed duplicated region for block: B:15:0x00f7  */
    /* JADX WARN: Removed duplicated region for block: B:27:0x009e  */
    /* JADX WARN: Removed duplicated region for block: B:28:0x00fa  */
    /* JADX WARN: Removed duplicated region for block: B:29:0x017d  */
    /* JADX WARN: Removed duplicated region for block: B:8:0x005c  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public final java.lang.Object updateServerUrl(long r8, kotlin.coroutines.Continuation<? super java.lang.String> r10) {
        /*
            Method dump skipped, instructions count: 391
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.hstream.impl.ProducerKtImpl.updateServerUrl(long, kotlin.coroutines.Continuation):java.lang.Object");
    }

    @Override // io.hstream.Producer
    @NotNull
    public CompletableFuture<String> write(@NotNull byte[] bArr) {
        Intrinsics.checkNotNullParameter(bArr, "rawRecord");
        HStreamRecord buildHStreamRecordFromRawRecord = RecordUtils.buildHStreamRecordFromRawRecord(bArr);
        Intrinsics.checkNotNullExpressionValue(buildHStreamRecordFromRawRecord, "hStreamRecord");
        return writeInternal(buildHStreamRecordFromRawRecord);
    }

    @Override // io.hstream.Producer
    @NotNull
    public CompletableFuture<String> write(@NotNull HRecord hRecord) {
        Intrinsics.checkNotNullParameter(hRecord, "hRecord");
        HStreamRecord buildHStreamRecordFromHRecord = RecordUtils.buildHStreamRecordFromHRecord(hRecord);
        Intrinsics.checkNotNullExpressionValue(buildHStreamRecordFromHRecord, "hStreamRecord");
        return writeInternal(buildHStreamRecordFromHRecord);
    }

    @Override // io.hstream.Producer
    @NotNull
    public CompletableFuture<String> write(@NotNull Record record) {
        Intrinsics.checkNotNullParameter(record, "record");
        HStreamRecord buildHStreamRecordFromRecord = RecordUtils.buildHStreamRecordFromRecord(record);
        Intrinsics.checkNotNullExpressionValue(buildHStreamRecordFromRecord, "hStreamRecord");
        return writeInternal(buildHStreamRecordFromRecord);
    }

    @NotNull
    protected CompletableFuture<String> writeInternal(@NotNull HStreamRecord hStreamRecord) {
        Intrinsics.checkNotNullParameter(hStreamRecord, "hStreamRecord");
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        BuildersKt.launch$default(writeRecordScope, (CoroutineContext) null, (CoroutineStart) null, new ProducerKtImpl$writeInternal$1(this, hStreamRecord, completableFuture, null), 3, (Object) null);
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final long calculateShardIdByPartitionKey(@NotNull String str) {
        Intrinsics.checkNotNullParameter(str, "partitionKey");
        BigInteger bigInteger = new BigInteger(Hashing.md5().hashString(str, StandardCharsets.UTF_8).toString(), 16);
        for (Shard shard : this.shards) {
            BigInteger bigInteger2 = new BigInteger(shard.getStartHashRangeKey());
            BigInteger bigInteger3 = new BigInteger(shard.getEndHashRangeKey());
            if (bigInteger.compareTo(bigInteger2) >= 0 && bigInteger.compareTo(bigInteger3) <= 0) {
                return shard.getShardId();
            }
        }
        throw new IllegalStateException("Check failed.".toString());
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Can't wrap try/catch for region: R(6:1|(2:3|(4:5|6|7|8))|59|6|7|8) */
    /* JADX WARN: Code restructure failed: missing block: B:49:0x0236, code lost:
    
        r20 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:50:0x0238, code lost:
    
        r30.L$0 = null;
        r30.L$1 = null;
        r30.L$2 = null;
        r30.label = 4;
        r0 = appendWithRetry$handleGRPCException(r15, r11, r12, r13, r18, r20, r30);
     */
    /* JADX WARN: Code restructure failed: missing block: B:51:0x0264, code lost:
    
        if (r0 == r0) goto L49;
     */
    /* JADX WARN: Code restructure failed: missing block: B:53:0x0269, code lost:
    
        return r0;
     */
    /* JADX WARN: Code restructure failed: missing block: B:54:0x01f3, code lost:
    
        r20 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:55:0x01f5, code lost:
    
        r30.L$0 = null;
        r30.L$1 = null;
        r30.L$2 = null;
        r30.label = 3;
        r0 = appendWithRetry$handleGRPCException(r15, r11, r12, r13, r18, r20, r30);
     */
    /* JADX WARN: Code restructure failed: missing block: B:56:0x0221, code lost:
    
        if (r0 == r0) goto L42;
     */
    /* JADX WARN: Code restructure failed: missing block: B:58:0x0226, code lost:
    
        return r0;
     */
    /* JADX WARN: Failed to find 'out' block for switch in B:8:0x0046. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:27:0x0145  */
    /* JADX WARN: Removed duplicated region for block: B:32:0x01c0 A[Catch: StatusException -> 0x01f3, StatusRuntimeException -> 0x0236, LOOP:0: B:30:0x01b6->B:32:0x01c0, LOOP_END, TryCatch #2 {StatusRuntimeException -> 0x0236, StatusException -> 0x01f3, blocks: (B:24:0x0102, B:29:0x0179, B:30:0x01b6, B:32:0x01c0, B:34:0x01e8, B:42:0x0171), top: B:7:0x0046 }] */
    /* JADX WARN: Removed duplicated region for block: B:40:0x00c5  */
    /* JADX WARN: Removed duplicated region for block: B:41:0x0148  */
    /* JADX WARN: Removed duplicated region for block: B:43:0x0227  */
    /* JADX WARN: Removed duplicated region for block: B:45:0x026a  */
    /* JADX WARN: Removed duplicated region for block: B:47:0x0279  */
    /* JADX WARN: Removed duplicated region for block: B:9:0x0068  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public final java.lang.Object appendWithRetry(io.hstream.internal.AppendRequest r12, long r13, int r15, boolean r16, kotlin.coroutines.Continuation<? super java.util.List<java.lang.String>> r17) {
        /*
            Method dump skipped, instructions count: 643
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.hstream.impl.ProducerKtImpl.appendWithRetry(io.hstream.internal.AppendRequest, long, int, boolean, kotlin.coroutines.Continuation):java.lang.Object");
    }

    static /* synthetic */ Object appendWithRetry$default(ProducerKtImpl producerKtImpl, AppendRequest appendRequest, long j, int i, boolean z, Continuation continuation, int i2, Object obj) {
        if (obj != null) {
            throw new UnsupportedOperationException("Super calls with default arguments not supported in this target, function: appendWithRetry");
        }
        if ((i2 & 8) != 0) {
            z = false;
        }
        return producerKtImpl.appendWithRetry(appendRequest, j, i, z, continuation);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nullable
    public final Object writeHStreamRecords(@NotNull List<HStreamRecord> list, long j, @NotNull CompressionType compressionType, @NotNull Continuation<? super List<String>> continuation) {
        AppendRequest m117build = AppendRequest.newBuilder().setStreamName(this.stream).setShardId(j).setRecords(BatchedRecord.newBuilder().setCompressionType(GrpcUtils.compressionTypeToInternal(compressionType)).setPayload(RecordUtils.compress(list, compressionType)).setBatchSize(list.size()).m207build()).m117build();
        Intrinsics.checkNotNullExpressionValue(m117build, "appendRequest");
        return appendWithRetry$default(this, m117build, j, 5, false, continuation, 8, null);
    }

    public static /* synthetic */ Object writeHStreamRecords$default(ProducerKtImpl producerKtImpl, List list, long j, CompressionType compressionType, Continuation continuation, int i, Object obj) {
        if (obj != null) {
            throw new UnsupportedOperationException("Super calls with default arguments not supported in this target, function: writeHStreamRecords");
        }
        if ((i & 4) != 0) {
            compressionType = CompressionType.NONE;
        }
        return producerKtImpl.writeHStreamRecords(list, j, compressionType, continuation);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Removed duplicated region for block: B:19:0x0107  */
    /* JADX WARN: Removed duplicated region for block: B:21:? A[RETURN, SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:24:0x00bd  */
    /* JADX WARN: Removed duplicated region for block: B:25:0x010a  */
    /* JADX WARN: Removed duplicated region for block: B:27:0x011c  */
    /* JADX WARN: Removed duplicated region for block: B:8:0x0060  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public static final java.lang.Object appendWithRetry$handleGRPCException(int r10, io.hstream.impl.ProducerKtImpl r11, io.hstream.internal.AppendRequest r12, long r13, java.lang.String r15, java.lang.Throwable r16, kotlin.coroutines.Continuation<? super java.util.List<java.lang.String>> r17) {
        /*
            Method dump skipped, instructions count: 294
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.hstream.impl.ProducerKtImpl.appendWithRetry$handleGRPCException(int, io.hstream.impl.ProducerKtImpl, io.hstream.internal.AppendRequest, long, java.lang.String, java.lang.Throwable, kotlin.coroutines.Continuation):java.lang.Object");
    }
}
