package io.hstream.impl;

import io.hstream.HStreamDBClientException;
import io.hstream.internal.RecordId;
import io.hstream.internal.Shard;
import io.hstream.internal.StreamingFetchRequest;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.CoroutineScopeKt;
import kotlinx.coroutines.Dispatchers;
import kotlinx.coroutines.flow.MutableSharedFlow;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* compiled from: ResponderImpl.kt */
@Metadata(mv = {1, Shard.ISACTIVE_FIELD_NUMBER, 0}, k = 1, xi = 48, d1 = {"��Z\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\b\n��\n\u0002\u0010\t\n\u0002\b\u0002\n\u0002\u0010!\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000b\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0005\u0018��2\u00020\u0001B3\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\f\u0010\u0004\u001a\b\u0012\u0004\u0012\u00020\u00060\u0005\u0012\u0006\u0010\u0007\u001a\u00020\u0003\u0012\u0006\u0010\b\u001a\u00020\t\u0012\u0006\u0010\n\u001a\u00020\u000b¢\u0006\u0002\u0010\fJ\u000e\u0010\u001a\u001a\u00020\u001b2\u0006\u0010\u001c\u001a\u00020\u000fJ\b\u0010\u001d\u001a\u00020\u001bH\u0016J\u0010\u0010\u001e\u001a\u00020\u001b2\b\b\u0002\u0010\u001f\u001a\u00020\u000bR\u000e\u0010\n\u001a\u00020\u000bX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0004\u001a\b\u0012\u0004\u0012\u00020\u00060\u0005X\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\r\u001a\b\u0012\u0004\u0012\u00020\u000f0\u000eX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0010\u001a\u00020\u0011X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u0007\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0012\u001a\u00020\u0013X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0014\u001a\u00020\u0015X\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0016\u001a\b\u0012\u0002\b\u0003\u0018\u00010\u0017X\u0082\u000e¢\u0006\u0002\n��R\u0010\u0010\u0018\u001a\u0004\u0018\u00010\u0019X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��¨\u0006 "}, d2 = {"Lio/hstream/impl/AckSender;", "Ljava/io/Closeable;", "subscriptionId", DefaultSettings.DEFAULT_PARTITION_KEY, "ackFlow", "Lkotlinx/coroutines/flow/MutableSharedFlow;", "Lio/hstream/internal/StreamingFetchRequest;", "consumerName", "bufferSize", DefaultSettings.DEFAULT_PARTITION_KEY, "ackAgeLimit", DefaultSettings.DEFAULT_PARTITION_KEY, "(Ljava/lang/String;Lkotlinx/coroutines/flow/MutableSharedFlow;Ljava/lang/String;IJ)V", "buffer", DefaultSettings.DEFAULT_PARTITION_KEY, "Lio/hstream/internal/RecordId;", "closed", DefaultSettings.DEFAULT_PARTITION_KEY, "emitScope", "Lkotlinx/coroutines/CoroutineScope;", "lock", "Ljava/util/concurrent/locks/ReentrantLock;", "pendingFlushFuture", "Ljava/util/concurrent/ScheduledFuture;", "scheduler", "Ljava/util/concurrent/ScheduledExecutorService;", "ack", DefaultSettings.DEFAULT_PARTITION_KEY, "recordId", "close", "flush", "timeoutMs", "client"})
/* loaded from: input_file:io/hstream/impl/AckSender.class */
public final class AckSender implements Closeable {

    @NotNull
    private final String subscriptionId;

    @NotNull
    private final MutableSharedFlow<StreamingFetchRequest> ackFlow;

    @NotNull
    private final String consumerName;
    private final int bufferSize;
    private final long ackAgeLimit;

    @NotNull
    private final ReentrantLock lock;

    @NotNull
    private final List<RecordId> buffer;

    @NotNull
    private final CoroutineScope emitScope;

    @Nullable
    private ScheduledExecutorService scheduler;
    private volatile boolean closed;

    @Nullable
    private ScheduledFuture<?> pendingFlushFuture;

    public AckSender(@NotNull String str, @NotNull MutableSharedFlow<StreamingFetchRequest> mutableSharedFlow, @NotNull String str2, int i, long j) {
        Intrinsics.checkNotNullParameter(str, "subscriptionId");
        Intrinsics.checkNotNullParameter(mutableSharedFlow, "ackFlow");
        Intrinsics.checkNotNullParameter(str2, "consumerName");
        this.subscriptionId = str;
        this.ackFlow = mutableSharedFlow;
        this.consumerName = str2;
        this.bufferSize = i;
        this.ackAgeLimit = j;
        this.lock = new ReentrantLock();
        this.buffer = new ArrayList(this.bufferSize);
        this.emitScope = CoroutineScopeKt.CoroutineScope(Dispatchers.getIO());
        if (this.ackAgeLimit <= 0 || this.bufferSize <= 1) {
            return;
        }
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
    }

    public final void ack(@NotNull RecordId recordId) {
        Intrinsics.checkNotNullParameter(recordId, "recordId");
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            if (this.closed) {
                throw new HStreamDBClientException("ackSender is Closed");
            }
            if (this.ackAgeLimit > 0 && this.buffer.isEmpty() && this.bufferSize > 1) {
                ScheduledExecutorService scheduledExecutorService = this.scheduler;
                Intrinsics.checkNotNull(scheduledExecutorService);
                this.pendingFlushFuture = scheduledExecutorService.schedule(() -> {
                    m5ack$lambda1$lambda0(r2);
                }, this.ackAgeLimit, TimeUnit.MILLISECONDS);
            }
            this.buffer.add(recordId);
            if (this.buffer.size() >= this.bufferSize) {
                flush$default(this, 0L, 1, null);
                ScheduledFuture<?> scheduledFuture = this.pendingFlushFuture;
                if (scheduledFuture != null) {
                    scheduledFuture.cancel(true);
                }
            }
            Unit unit = Unit.INSTANCE;
            reentrantLock.unlock();
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    /*  JADX ERROR: JadxRuntimeException in pass: BlockSplitter
        jadx.core.utils.exceptions.JadxRuntimeException: Unexpected missing predecessor for block: B:10:0x0071
        	at jadx.core.dex.visitors.blocks.BlockSplitter.addTempConnectionsForExcHandlers(BlockSplitter.java:275)
        	at jadx.core.dex.visitors.blocks.BlockSplitter.visit(BlockSplitter.java:68)
        */
    public final void flush(long r10) {
        /*
            r9 = this;
            r0 = r9
            java.util.concurrent.locks.ReentrantLock r0 = r0.lock
            java.util.concurrent.locks.Lock r0 = (java.util.concurrent.locks.Lock) r0
            r12 = r0
            r0 = r12
            r0.lock()
            r0 = 0
            r13 = r0
            r0 = r9
            java.util.List<io.hstream.internal.RecordId> r0 = r0.buffer     // Catch: java.lang.Throwable -> Lab
            boolean r0 = r0.isEmpty()     // Catch: java.lang.Throwable -> Lab
            if (r0 == 0) goto L26
        L1f:
            r0 = r12
            r0.unlock()
            return
        L26:
            io.hstream.internal.StreamingFetchRequest$Builder r0 = io.hstream.internal.StreamingFetchRequest.newBuilder()     // Catch: java.lang.Throwable -> Lab
            r1 = r9
            java.lang.String r1 = r1.subscriptionId     // Catch: java.lang.Throwable -> Lab
            io.hstream.internal.StreamingFetchRequest$Builder r0 = r0.setSubscriptionId(r1)     // Catch: java.lang.Throwable -> Lab
            r1 = r9
            java.lang.String r1 = r1.consumerName     // Catch: java.lang.Throwable -> Lab
            io.hstream.internal.StreamingFetchRequest$Builder r0 = r0.setConsumerName(r1)     // Catch: java.lang.Throwable -> Lab
            java.util.ArrayList r1 = new java.util.ArrayList     // Catch: java.lang.Throwable -> Lab
            r2 = r1
            r3 = r9
            java.util.List<io.hstream.internal.RecordId> r3 = r3.buffer     // Catch: java.lang.Throwable -> Lab
            java.util.Collection r3 = (java.util.Collection) r3     // Catch: java.lang.Throwable -> Lab
            r2.<init>(r3)     // Catch: java.lang.Throwable -> Lab
            java.lang.Iterable r1 = (java.lang.Iterable) r1     // Catch: java.lang.Throwable -> Lab
            io.hstream.internal.StreamingFetchRequest$Builder r0 = r0.addAllAckIds(r1)     // Catch: java.lang.Throwable -> Lab
            io.hstream.internal.StreamingFetchRequest r0 = r0.m3791build()     // Catch: java.lang.Throwable -> Lab
            r14 = r0
            r0 = r9
            kotlinx.coroutines.CoroutineScope r0 = r0.emitScope     // Catch: java.lang.Throwable -> Lab
            r1 = 0
            r2 = 0
            io.hstream.impl.AckSender$flush$1$future$1 r3 = new io.hstream.impl.AckSender$flush$1$future$1     // Catch: java.lang.Throwable -> Lab
            r4 = r3
            r5 = r9
            r6 = r14
            r7 = 0
            r4.<init>(r5, r6, r7)     // Catch: java.lang.Throwable -> Lab
            kotlin.jvm.functions.Function2 r3 = (kotlin.jvm.functions.Function2) r3     // Catch: java.lang.Throwable -> Lab
            r4 = 3
            r5 = 0
            java.util.concurrent.CompletableFuture r0 = kotlinx.coroutines.future.FutureKt.future$default(r0, r1, r2, r3, r4, r5)     // Catch: java.lang.Throwable -> Lab
            r15 = r0
            r0 = r10
            r1 = 0
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 <= 0) goto L93
        L72:
            r0 = r15
            r1 = r10
            java.util.concurrent.TimeUnit r2 = java.util.concurrent.TimeUnit.MILLISECONDS     // Catch: java.lang.Throwable -> L7f java.lang.Throwable -> Lab
            java.lang.Object r0 = r0.get(r1, r2)     // Catch: java.lang.Throwable -> L7f java.lang.Throwable -> Lab
            goto L93
        L7f:
            r16 = move-exception
            org.slf4j.Logger r0 = io.hstream.impl.UtilsKt.getLogger()     // Catch: java.lang.Throwable -> Lab
            r1 = r16
            java.lang.String r1 = r1.getMessage()     // Catch: java.lang.Throwable -> Lab
            java.lang.String r1 = "ack failed, " + r1     // Catch: java.lang.Throwable -> Lab
            r0.error(r1)     // Catch: java.lang.Throwable -> Lab
        L93:
            r0 = r9
            java.util.List<io.hstream.internal.RecordId> r0 = r0.buffer     // Catch: java.lang.Throwable -> Lab
            r0.clear()     // Catch: java.lang.Throwable -> Lab
            kotlin.Unit r0 = kotlin.Unit.INSTANCE     // Catch: java.lang.Throwable -> Lab
            r13 = r0
            r0 = r12
            r0.unlock()
            goto Lb6
        Lab:
            r13 = move-exception
            r0 = r12
            r0.unlock()
            r0 = r13
            throw r0
        Lb6:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: io.hstream.impl.AckSender.flush(long):void");
    }

    public static /* synthetic */ void flush$default(AckSender ackSender, long j, int i, Object obj) {
        if ((i & 1) != 0) {
            j = 0;
        }
        ackSender.flush(j);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closed = true;
        flush(300L);
        ScheduledExecutorService scheduledExecutorService = this.scheduler;
        if (scheduledExecutorService != null) {
            scheduledExecutorService.shutdown();
        }
        CoroutineScopeKt.cancel$default(this.emitScope, (CancellationException) null, 1, (Object) null);
    }

    /* renamed from: ack$lambda-1$lambda-0, reason: not valid java name */
    private static final void m5ack$lambda1$lambda0(AckSender ackSender) {
        Intrinsics.checkNotNullParameter(ackSender, "this$0");
        flush$default(ackSender, 0L, 1, null);
    }
}
