package io.hstream.impl;

import io.grpc.stub.StreamObserver;
import io.hstream.HRecord;
import io.hstream.HStreamDBClientException;
import io.hstream.Producer;
import io.hstream.RecordId;
import io.hstream.internal.AppendRequest;
import io.hstream.internal.AppendResponse;
import io.hstream.internal.HStreamApiGrpc;
import io.hstream.util.GrpcUtils;
import io.hstream.util.RecordUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hstream/impl/ProducerImpl.class */
public class ProducerImpl implements Producer {
    private static final Logger logger = LoggerFactory.getLogger(ProducerImpl.class);
    private final HStreamApiGrpc.HStreamApiStub grpcStub;
    private final String stream;
    private final boolean enableBatch;
    private final int recordCountLimit;
    private final Semaphore semaphore;
    private final Lock lock;
    private final List<Object> recordBuffer;
    private final List<CompletableFuture<RecordId>> futures;

    public ProducerImpl(HStreamApiGrpc.HStreamApiStub hStreamApiStub, String str, boolean z, int i) {
        this.grpcStub = hStreamApiStub;
        this.stream = str;
        this.enableBatch = z;
        this.recordCountLimit = i;
        if (z) {
            this.semaphore = new Semaphore(i);
            this.lock = new ReentrantLock();
            this.recordBuffer = new ArrayList(i);
            this.futures = new ArrayList(i);
            return;
        }
        this.semaphore = null;
        this.lock = null;
        this.recordBuffer = null;
        this.futures = null;
    }

    @Override // io.hstream.Producer
    public RecordId write(byte[] bArr) {
        return writeRawRecordsAsync(List.of(bArr)).join().get(0);
    }

    @Override // io.hstream.Producer
    public RecordId write(HRecord hRecord) {
        return writeHRecordsAsync(List.of(hRecord)).join().get(0);
    }

    @Override // io.hstream.Producer
    public CompletableFuture<RecordId> writeAsync(byte[] bArr) {
        if (!this.enableBatch) {
            return writeRawRecordsAsync(List.of(bArr)).thenApply(list -> {
                return (RecordId) list.get(0);
            });
        }
        try {
            this.semaphore.acquire();
            this.lock.lock();
            try {
                CompletableFuture<RecordId> completableFuture = new CompletableFuture<>();
                this.recordBuffer.add(bArr);
                this.futures.add(completableFuture);
                if (this.recordBuffer.size() == this.recordCountLimit) {
                    flush();
                }
                return completableFuture;
            } finally {
                this.lock.unlock();
            }
        } catch (InterruptedException e) {
            throw new HStreamDBClientException(e);
        }
    }

    @Override // io.hstream.Producer
    public CompletableFuture<RecordId> writeAsync(HRecord hRecord) {
        if (!this.enableBatch) {
            return writeHRecordsAsync(List.of(hRecord)).thenApply(list -> {
                return (RecordId) list.get(0);
            });
        }
        try {
            this.semaphore.acquire();
            this.lock.lock();
            try {
                CompletableFuture<RecordId> completableFuture = new CompletableFuture<>();
                this.recordBuffer.add(hRecord);
                this.futures.add(completableFuture);
                if (this.recordBuffer.size() == this.recordCountLimit) {
                    flush();
                }
                return completableFuture;
            } finally {
                this.lock.unlock();
            }
        } catch (InterruptedException e) {
            throw new HStreamDBClientException(e);
        }
    }

    @Override // io.hstream.Producer
    public void flush() {
        flushSync();
    }

    private CompletableFuture<List<RecordId>> writeRawRecordsAsync(List<byte[]> list) {
        final CompletableFuture<List<RecordId>> completableFuture = new CompletableFuture<>();
        this.grpcStub.append(AppendRequest.newBuilder().setStreamName(this.stream).addAllRecords((Iterable) list.stream().map(bArr -> {
            return RecordUtils.buildHStreamRecordFromRawRecord(bArr);
        }).collect(Collectors.toList())).m46build(), new StreamObserver<AppendResponse>() { // from class: io.hstream.impl.ProducerImpl.1
            public void onNext(AppendResponse appendResponse) {
                completableFuture.complete((List) appendResponse.getRecordIdsList().stream().map(GrpcUtils::recordIdFromGrpc).collect(Collectors.toList()));
            }

            public void onError(Throwable th) {
                throw new HStreamDBClientException(th);
            }

            public void onCompleted() {
            }
        });
        return completableFuture;
    }

    private CompletableFuture<List<RecordId>> writeHRecordsAsync(List<HRecord> list) {
        final CompletableFuture<List<RecordId>> completableFuture = new CompletableFuture<>();
        this.grpcStub.append(AppendRequest.newBuilder().setStreamName(this.stream).addAllRecords((Iterable) list.stream().map(hRecord -> {
            return RecordUtils.buildHStreamRecordFromHRecord(hRecord);
        }).collect(Collectors.toList())).m46build(), new StreamObserver<AppendResponse>() { // from class: io.hstream.impl.ProducerImpl.2
            public void onNext(AppendResponse appendResponse) {
                completableFuture.complete((List) appendResponse.getRecordIdsList().stream().map(GrpcUtils::recordIdFromGrpc).collect(Collectors.toList()));
            }

            public void onError(Throwable th) {
                throw new HStreamDBClientException(th);
            }

            public void onCompleted() {
            }
        });
        return completableFuture;
    }

    private void flushSync() {
        this.lock.lock();
        try {
            if (this.recordBuffer.isEmpty()) {
                return;
            }
            int size = this.recordBuffer.size();
            logger.info("start flush recordBuffer, current buffer size is: {}", Integer.valueOf(size));
            List list = (List) IntStream.range(0, size).filter(i -> {
                return this.recordBuffer.get(i) instanceof byte[];
            }).mapToObj(i2 -> {
                return ImmutablePair.of(Integer.valueOf(i2), (byte[]) this.recordBuffer.get(i2));
            }).collect(Collectors.toList());
            List list2 = (List) IntStream.range(0, size).filter(i3 -> {
                return this.recordBuffer.get(i3) instanceof HRecord;
            }).mapToObj(i4 -> {
                return ImmutablePair.of(Integer.valueOf(i4), (HRecord) this.recordBuffer.get(i4));
            }).collect(Collectors.toList());
            List<RecordId> join = writeRawRecordsAsync((List) list.stream().map(immutablePair -> {
                return (byte[]) immutablePair.getRight();
            }).collect(Collectors.toList())).join();
            List<RecordId> join2 = writeHRecordsAsync((List) list2.stream().map((v0) -> {
                return v0.getRight();
            }).collect(Collectors.toList())).join();
            IntStream.range(0, list.size()).mapToObj(i5 -> {
                return ImmutablePair.of(Integer.valueOf(i5), (Integer) ((ImmutablePair) list.get(i5)).getLeft());
            }).forEach(immutablePair2 -> {
                this.futures.get(((Integer) immutablePair2.getRight()).intValue()).complete((RecordId) join.get(((Integer) immutablePair2.getLeft()).intValue()));
            });
            IntStream.range(0, list2.size()).mapToObj(i6 -> {
                return ImmutablePair.of(Integer.valueOf(i6), (Integer) ((ImmutablePair) list2.get(i6)).getLeft());
            }).forEach(immutablePair3 -> {
                this.futures.get(((Integer) immutablePair3.getRight()).intValue()).complete((RecordId) join2.get(((Integer) immutablePair3.getLeft()).intValue()));
            });
            this.recordBuffer.clear();
            this.futures.clear();
            logger.info("finish clearing record buffer");
            this.semaphore.release(size);
            this.lock.unlock();
        } finally {
            this.lock.unlock();
        }
    }
}
