package io.camunda.zeebe.logstreams.util;

import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.rules.ExternalResource;

/* loaded from: input_file:io/camunda/zeebe/logstreams/util/LogStreamWriterRule.class */
public final class LogStreamWriterRule extends ExternalResource {
    private final LogStreamRule logStreamRule;
    private SynchronousLogStream logStream;
    private LogStreamBatchWriter logStreamWriter;

    public LogStreamWriterRule(LogStreamRule logStreamRule) {
        this.logStreamRule = logStreamRule;
    }

    protected void before() {
        this.logStream = this.logStreamRule.getLogStream();
        this.logStreamWriter = this.logStream.newLogStreamBatchWriter();
    }

    protected void after() {
        closeWriter();
        this.logStream = null;
    }

    public void closeWriter() {
        this.logStreamWriter = null;
    }

    public long writeEvents(int i, DirectBuffer directBuffer) {
        return writeEvents((int) Math.ceil(i / 2.0d), 2, directBuffer);
    }

    public long writeEvents(int i, int i2, DirectBuffer directBuffer) {
        long j = -1;
        for (int i3 = 0; i3 < i; i3++) {
            for (int i4 = 1; i4 <= i2; i4++) {
                this.logStreamWriter.event().key(i4 + (i3 * i2)).value(directBuffer).done();
            }
            ConditionFactory pollInSameThread = Awaitility.await("until batch is written").atMost(Duration.ofSeconds(5L)).pollDelay(Duration.ofNanos(0L)).pollInSameThread();
            LogStreamBatchWriter logStreamBatchWriter = this.logStreamWriter;
            Objects.requireNonNull(logStreamBatchWriter);
            j = ((Long) pollInSameThread.until(logStreamBatchWriter::tryWrite, l -> {
                return l.longValue() >= 0;
            })).longValue();
        }
        waitForPositionToBeCommitted(j);
        return j;
    }

    public long writeEvent(DirectBuffer directBuffer) {
        return writeEvent(logEntryBuilder -> {
            logEntryBuilder.value(directBuffer);
        });
    }

    public long writeEvent(Consumer<LogStreamBatchWriter.LogEntryBuilder> consumer) {
        long writeEventInternal = writeEventInternal(consumer);
        waitForPositionToBeCommitted(writeEventInternal);
        return writeEventInternal;
    }

    private long writeEventInternal(Consumer<LogStreamBatchWriter.LogEntryBuilder> consumer) {
        long tryWrite;
        do {
            tryWrite = tryWrite(consumer);
        } while (tryWrite == -1);
        return tryWrite;
    }

    public long tryWrite(Consumer<LogStreamBatchWriter.LogEntryBuilder> consumer) {
        LogStreamBatchWriter.LogEntryBuilder event = this.logStreamWriter.event();
        consumer.accept(event);
        event.done();
        return this.logStreamWriter.tryWrite();
    }

    public void waitForPositionToBeCommitted(long j) {
        Awaitility.await("until position " + j + " is committed").atMost(Duration.ofSeconds(5L)).pollDelay(Duration.ofNanos(0L)).pollInSameThread().until(() -> {
            return Boolean.valueOf(this.logStream.getCommitPosition() >= j);
        });
    }
}
