package io.zeebe.broker.util;

import io.zeebe.broker.logstreams.processor.TypedEventStreamProcessorBuilder;
import io.zeebe.broker.logstreams.processor.TypedStreamEnvironment;
import io.zeebe.broker.logstreams.state.DefaultZeebeDbFactory;
import io.zeebe.broker.logstreams.state.ZeebeState;
import io.zeebe.broker.transport.clientapi.BufferingServerOutput;
import io.zeebe.db.DbContext;
import io.zeebe.db.ZeebeDb;
import io.zeebe.db.ZeebeDbFactory;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.processor.StreamProcessorFactory;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.servicecontainer.testing.ServiceContainerRule;
import io.zeebe.test.util.AutoCloseableRule;
import io.zeebe.util.ZbLogger;
import io.zeebe.util.sched.ActorScheduler;
import io.zeebe.util.sched.clock.ControlledActorClock;
import io.zeebe.util.sched.testing.ActorSchedulerRule;
import org.junit.rules.ExternalResource;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/util/StreamProcessorRule.class */
public class StreamProcessorRule implements TestRule {
    private static final Logger LOG = new ZbLogger("io.zeebe.broker.test");
    public static final int PARTITION_ID = 0;
    private final TemporaryFolder tempFolder;
    private final AutoCloseableRule closeables;
    private final ControlledActorClock clock;
    private final ActorSchedulerRule actorSchedulerRule;
    private final ServiceContainerRule serviceContainerRule;
    private final ZeebeDbFactory zeebeDbFactory;
    public static final String STREAM_NAME = "stream";
    private BufferingServerOutput output;
    private TestStreams streams;
    private TypedStreamEnvironment streamEnvironment;
    private final SetupRule rule;
    private ZeebeState zeebeState;
    private final RuleChain chain;

    /* loaded from: input_file:io/zeebe/broker/util/StreamProcessorRule$FailedTestRecordPrinter.class */
    private class FailedTestRecordPrinter extends TestWatcher {
        private FailedTestRecordPrinter() {
        }

        protected void failed(Throwable th, Description description) {
            StreamProcessorRule.LOG.info("Test failed, following records where exported:");
            StreamProcessorRule.this.printAllRecords();
        }
    }

    /* loaded from: input_file:io/zeebe/broker/util/StreamProcessorRule$SetupRule.class */
    private class SetupRule extends ExternalResource {
        private final int partitionId;

        SetupRule(int i) {
            this.partitionId = i;
        }

        protected void before() {
            StreamProcessorRule.this.output = new BufferingServerOutput();
            StreamProcessorRule.this.streams = new TestStreams(StreamProcessorRule.this.tempFolder, StreamProcessorRule.this.closeables, StreamProcessorRule.this.serviceContainerRule.get(), StreamProcessorRule.this.actorSchedulerRule.get());
            StreamProcessorRule.this.streams.createLogStream(StreamProcessorRule.STREAM_NAME, this.partitionId);
            StreamProcessorRule.this.streams.newRecord(StreamProcessorRule.STREAM_NAME).event(new UnpackedObject()).write();
            StreamProcessorRule.this.streamEnvironment = new TypedStreamEnvironment(StreamProcessorRule.this.streams.getLogStream(StreamProcessorRule.STREAM_NAME), StreamProcessorRule.this.output);
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:io/zeebe/broker/util/StreamProcessorRule$StreamProcessorTestFactory.class */
    public interface StreamProcessorTestFactory {
        StreamProcessor build(TypedEventStreamProcessorBuilder typedEventStreamProcessorBuilder, ZeebeDb zeebeDb, DbContext dbContext);
    }

    public StreamProcessorRule() {
        this(0);
    }

    public StreamProcessorRule(int i) {
        this(i, DefaultZeebeDbFactory.DEFAULT_DB_FACTORY);
    }

    public StreamProcessorRule(int i, ZeebeDbFactory zeebeDbFactory) {
        this.tempFolder = new TemporaryFolder();
        this.closeables = new AutoCloseableRule();
        this.clock = new ControlledActorClock();
        this.actorSchedulerRule = new ActorSchedulerRule(this.clock);
        this.serviceContainerRule = new ServiceContainerRule(this.actorSchedulerRule);
        this.rule = new SetupRule(i);
        this.zeebeDbFactory = zeebeDbFactory;
        this.chain = RuleChain.outerRule(this.tempFolder).around(this.actorSchedulerRule).around(this.serviceContainerRule).around(this.closeables).around(new FailedTestRecordPrinter()).around(this.rule);
    }

    public Statement apply(Statement statement, Description description) {
        return this.chain.apply(statement, description);
    }

    public StreamProcessorControl runStreamProcessor(StreamProcessorFactory streamProcessorFactory) {
        StreamProcessorControl initStreamProcessor = initStreamProcessor(streamProcessorFactory);
        initStreamProcessor.start();
        return initStreamProcessor;
    }

    public StreamProcessorControl runStreamProcessor(StreamProcessorTestFactory streamProcessorTestFactory) {
        StreamProcessorControl initStreamProcessor = initStreamProcessor(streamProcessorTestFactory);
        initStreamProcessor.start();
        return initStreamProcessor;
    }

    public StreamProcessorControl initStreamProcessor(StreamProcessorFactory streamProcessorFactory) {
        return this.streams.initStreamProcessor(STREAM_NAME, 0, this.zeebeDbFactory, streamProcessorFactory);
    }

    public StreamProcessorControl initStreamProcessor(StreamProcessorTestFactory streamProcessorTestFactory) {
        return this.streams.initStreamProcessor(STREAM_NAME, 0, this.zeebeDbFactory, (zeebeDb, dbContext) -> {
            this.zeebeState = new ZeebeState(zeebeDb, dbContext);
            return streamProcessorTestFactory.build(this.streamEnvironment.newStreamProcessor().zeebeState(this.zeebeState), zeebeDb, dbContext);
        });
    }

    public ControlledActorClock getClock() {
        return this.clock;
    }

    public ActorScheduler getActorScheduler() {
        return this.actorSchedulerRule.get();
    }

    public ZeebeState getZeebeState() {
        return this.zeebeState;
    }

    public RecordStream events() {
        return new RecordStream(this.streams.events(STREAM_NAME));
    }

    public long writeEvent(long j, Intent intent, UnpackedObject unpackedObject) {
        return this.streams.newRecord(STREAM_NAME).recordType(RecordType.EVENT).key(j).intent(intent).event(unpackedObject).write();
    }

    public long writeEvent(Intent intent, UnpackedObject unpackedObject) {
        return this.streams.newRecord(STREAM_NAME).recordType(RecordType.EVENT).intent(intent).event(unpackedObject).write();
    }

    public long writeCommand(long j, Intent intent, UnpackedObject unpackedObject) {
        return this.streams.newRecord(STREAM_NAME).recordType(RecordType.COMMAND).key(j).intent(intent).event(unpackedObject).write();
    }

    public long writeCommand(Intent intent, UnpackedObject unpackedObject) {
        return this.streams.newRecord(STREAM_NAME).recordType(RecordType.COMMAND).intent(intent).event(unpackedObject).write();
    }

    public void printAllRecords() {
        LogStreamPrinter.printRecords(this.streams.getLogStream(STREAM_NAME));
    }
}
