package io.zeebe.containers.engine;

import io.camunda.zeebe.process.test.api.RecordStreamSource;
import io.camunda.zeebe.protocol.record.Record;
import io.zeebe.containers.ZeebeBrokerNode;
import io.zeebe.containers.exporter.DebugReceiver;
import java.time.Duration;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.function.BooleanSupplier;
import net.jcip.annotations.ThreadSafe;
import org.agrona.collections.MutableInteger;
import org.agrona.collections.MutableLong;
import org.apiguardian.api.API;

@API(status = API.Status.INTERNAL)
@ThreadSafe
/* loaded from: input_file:io/zeebe/containers/engine/DebugReceiverStream.class */
final class DebugReceiverStream implements RecordStreamSource, AutoCloseable {
    private final InfiniteList<Record<?>> records;
    private final DebugReceiver receiver;
    private final Duration idlePeriod;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: 'this' call moved to the top of the method (can break code semantics) */
    public DebugReceiverStream(InfiniteList<Record<?>> infiniteList) {
        this(infiniteList, new DebugReceiver((v1) -> {
            r4.add(v1);
        }));
        Objects.requireNonNull(infiniteList);
    }

    /* JADX WARN: 'this' call moved to the top of the method (can break code semantics) */
    DebugReceiverStream(InfiniteList<Record<?>> infiniteList, Duration duration) {
        this(infiniteList, new DebugReceiver((v1) -> {
            r4.add(v1);
        }), duration);
        Objects.requireNonNull(infiniteList);
    }

    DebugReceiverStream(InfiniteList<Record<?>> infiniteList, DebugReceiver debugReceiver) {
        this(infiniteList, debugReceiver, Duration.ofSeconds(1L));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DebugReceiverStream(InfiniteList<Record<?>> infiniteList, DebugReceiver debugReceiver, Duration duration) {
        this.records = infiniteList;
        this.receiver = debugReceiver;
        this.idlePeriod = duration;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start(Collection<? extends ZeebeBrokerNode<?>> collection) {
        this.receiver.start();
        int port = this.receiver.serverAddress().getPort();
        collection.forEach(zeebeBrokerNode -> {
            zeebeBrokerNode.withDebugExporter(port);
        });
    }

    void stop() {
        this.receiver.stop();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void acknowledge(int i, long j) {
        this.receiver.acknowledge(i, j);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        stop();
    }

    public Iterable<Record<?>> getRecords() {
        return this.records;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void waitForIdleState(Duration duration) throws InterruptedException, TimeoutException {
        MutableInteger mutableInteger = new MutableInteger(this.records.size());
        MutableLong mutableLong = new MutableLong(System.nanoTime());
        MutableLong mutableLong2 = new MutableLong(0L);
        Duration ofMillis = Duration.ofMillis(100L);
        long nanos = this.idlePeriod.toNanos();
        awaitConditionHolds(duration, ofMillis, "until no records are exported for 1 second", () -> {
            int i = mutableInteger.get();
            mutableInteger.set(this.records.size());
            long nanoTime = System.nanoTime();
            long j = nanoTime - mutableLong.get();
            mutableLong.set(nanoTime);
            if (i == mutableInteger.get()) {
                mutableLong2.set(mutableLong2.get() + j);
            } else {
                mutableLong2.set(0L);
            }
            return mutableLong2.get() >= nanos;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void waitForBusyState(Duration duration) throws InterruptedException, TimeoutException {
        MutableInteger mutableInteger = new MutableInteger(this.records.size());
        awaitConditionHolds(duration, Duration.ofMillis(100L), "until a record is exported", () -> {
            int i = mutableInteger.get();
            mutableInteger.set(this.records.size());
            return i != mutableInteger.get();
        });
    }

    private void awaitConditionHolds(Duration duration, Duration duration2, String str, BooleanSupplier booleanSupplier) throws TimeoutException, InterruptedException {
        Thread currentThread = Thread.currentThread();
        long nanoTime = System.nanoTime() + duration.toNanos();
        while (!currentThread.isInterrupted() && System.nanoTime() < nanoTime && !booleanSupplier.getAsBoolean()) {
            Thread.sleep(duration2.toMillis());
        }
        if (System.nanoTime() >= nanoTime) {
            throw new TimeoutException("Timed out waiting " + str);
        }
    }
}
