package io.camunda.zeebe.engine.util;

import io.camunda.zeebe.db.DbKey;
import io.camunda.zeebe.db.DbValue;
import io.camunda.zeebe.engine.processing.EngineProcessors;
import io.camunda.zeebe.engine.processing.bpmn.multiinstance.MultiInstanceSubProcessTest;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.JobStreamer;
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.engine.state.immutable.ProcessingState;
import io.camunda.zeebe.engine.util.client.DecisionEvaluationClient;
import io.camunda.zeebe.engine.util.client.DeploymentClient;
import io.camunda.zeebe.engine.util.client.IncidentClient;
import io.camunda.zeebe.engine.util.client.JobActivationClient;
import io.camunda.zeebe.engine.util.client.JobClient;
import io.camunda.zeebe.engine.util.client.ProcessInstanceClient;
import io.camunda.zeebe.engine.util.client.PublishMessageClient;
import io.camunda.zeebe.engine.util.client.ResourceDeletionClient;
import io.camunda.zeebe.engine.util.client.SignalClient;
import io.camunda.zeebe.engine.util.client.VariableClient;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.util.ListLogStorage;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import io.camunda.zeebe.scheduler.clock.ControlledActorClock;
import io.camunda.zeebe.stream.api.CommandResponseWriter;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import io.camunda.zeebe.stream.impl.StreamProcessor;
import io.camunda.zeebe.stream.impl.StreamProcessorListener;
import io.camunda.zeebe.stream.impl.StreamProcessorMode;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import io.camunda.zeebe.util.FeatureFlags;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

/* loaded from: input_file:io/camunda/zeebe/engine/util/EngineRule.class */
public final class EngineRule extends ExternalResource {
    private static final int PARTITION_ID = 1;
    private final StreamProcessorRule environmentRule;
    private final RecordingExporterTestWatcher recordingExporterTestWatcher;
    private final int partitionCount;
    private Consumer<TypedRecord> onProcessedCallback;
    private Consumer<LoggedEvent> onSkippedCallback;
    private long lastProcessedPosition;
    private JobStreamer jobStreamer;
    private FeatureFlags featureFlags;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/engine/util/EngineRule$VersatileBlob.class */
    public static final class VersatileBlob implements DbKey, DbValue {
        private final DirectBuffer genericBuffer = new UnsafeBuffer(0, 0);

        private VersatileBlob() {
        }

        public void wrap(DirectBuffer directBuffer, int i, int i2) {
            this.genericBuffer.wrap(directBuffer, i, i2);
        }

        public int getLength() {
            return this.genericBuffer.capacity();
        }

        public void write(MutableDirectBuffer mutableDirectBuffer, int i) {
            mutableDirectBuffer.putBytes(i, this.genericBuffer, 0, this.genericBuffer.capacity());
        }

        public DirectBuffer getDirectBuffer() {
            return this.genericBuffer;
        }
    }

    private EngineRule(int i) {
        this(i, null);
    }

    private EngineRule(int i, ListLogStorage listLogStorage) {
        this.recordingExporterTestWatcher = new RecordingExporterTestWatcher();
        this.onProcessedCallback = typedRecord -> {
        };
        this.onSkippedCallback = loggedEvent -> {
        };
        this.lastProcessedPosition = -1L;
        this.jobStreamer = JobStreamer.noop();
        this.featureFlags = FeatureFlags.createDefaultForTests();
        this.partitionCount = i;
        this.environmentRule = new StreamProcessorRule(1, i, DefaultZeebeDbFactory.defaultFactory(), listLogStorage);
    }

    public static EngineRule singlePartition() {
        return new EngineRule(1);
    }

    public static EngineRule multiplePartition(int i) {
        return new EngineRule(i);
    }

    public static EngineRule withSharedStorage(ListLogStorage listLogStorage) {
        return new EngineRule(1, listLogStorage);
    }

    public Statement apply(Statement statement, Description description) {
        return this.environmentRule.apply(super.apply(this.recordingExporterTestWatcher.apply(statement, description), description), description);
    }

    protected void before() {
        startProcessors();
    }

    public void start() {
        startProcessors();
    }

    public void stop() {
        StreamProcessorRule streamProcessorRule = this.environmentRule;
        Objects.requireNonNull(streamProcessorRule);
        forEachPartition((v1) -> {
            r1.closeStreamProcessor(v1);
        });
    }

    public EngineRule withJobStreamer(JobStreamer jobStreamer) {
        this.jobStreamer = jobStreamer;
        return this;
    }

    public EngineRule withFeatureFlags(FeatureFlags featureFlags) {
        this.featureFlags = featureFlags;
        return this;
    }

    public EngineRule withOnProcessedCallback(Consumer<TypedRecord> consumer) {
        this.onProcessedCallback = this.onProcessedCallback.andThen(consumer);
        return this;
    }

    public EngineRule withOnSkippedCallback(Consumer<LoggedEvent> consumer) {
        this.onSkippedCallback = this.onSkippedCallback.andThen(consumer);
        return this;
    }

    public EngineRule withStreamProcessorMode(StreamProcessorMode streamProcessorMode) {
        this.environmentRule.withStreamProcessorMode(streamProcessorMode);
        return this;
    }

    private void startProcessors() {
        ArrayList arrayList = new ArrayList();
        forEachPartition(num -> {
            StreamProcessorRule streamProcessorRule = this.environmentRule;
            Objects.requireNonNull(streamProcessorRule);
            TestInterPartitionCommandSender testInterPartitionCommandSender = new TestInterPartitionCommandSender((v1) -> {
                return r2.newLogStreamWriter(v1);
            });
            arrayList.add(testInterPartitionCommandSender);
            this.environmentRule.startTypedStreamProcessor(num.intValue(), typedRecordProcessorContext -> {
                return EngineProcessors.createEngineProcessors(typedRecordProcessorContext, this.partitionCount, new SubscriptionCommandSender(num.intValue(), testInterPartitionCommandSender), testInterPartitionCommandSender, this.featureFlags, this.jobStreamer).withListener(new ProcessingExporterTransistor(this.environmentRule.getLogStream(num.intValue())));
            }, Optional.of(new StreamProcessorListener() { // from class: io.camunda.zeebe.engine.util.EngineRule.1
                public void onProcessed(TypedRecord<?> typedRecord) {
                    EngineRule.this.lastProcessedPosition = typedRecord.getPosition();
                    EngineRule.this.onProcessedCallback.accept(typedRecord);
                }

                public void onSkipped(LoggedEvent loggedEvent) {
                    EngineRule.this.lastProcessedPosition = loggedEvent.getPosition();
                    EngineRule.this.onSkippedCallback.accept(loggedEvent);
                }
            }));
        });
        arrayList.forEach(testInterPartitionCommandSender -> {
            testInterPartitionCommandSender.initializeWriters(this.partitionCount);
        });
    }

    public void forEachPartition(Consumer<Integer> consumer) {
        int i = 1;
        for (int i2 = 0; i2 < this.partitionCount; i2++) {
            int i3 = i;
            i++;
            consumer.accept(Integer.valueOf(i3));
        }
    }

    public void increaseTime(Duration duration) {
        if (this.environmentRule.getStreamProcessor(1).getCurrentPhase().join() == StreamProcessor.Phase.PROCESSING) {
            Awaitility.await("Expect that engine reaches the end of the log before increasing the time").until(this::hasReachedEnd);
        }
        this.environmentRule.getClock().addTime(duration);
    }

    public void reprocess() {
        forEachPartition(num -> {
            try {
                this.environmentRule.closeStreamProcessor(num.intValue());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        int size = RecordingExporter.getRecords().size();
        RecordingExporter.reset();
        startProcessors();
        TestUtil.waitUntil(() -> {
            return RecordingExporter.getRecords().size() >= size;
        }, "Failed to reprocess all events, only re-exported %d but expected %d", new Object[]{Integer.valueOf(RecordingExporter.getRecords().size()), Integer.valueOf(size)});
    }

    public List<Integer> getPartitionIds() {
        return (List) IntStream.range(1, 1 + this.partitionCount).boxed().collect(Collectors.toList());
    }

    public ControlledActorClock getClock() {
        return this.environmentRule.getClock();
    }

    public ProcessingState getProcessingState() {
        return this.environmentRule.getProcessingState();
    }

    public StreamProcessor getStreamProcessor(int i) {
        return this.environmentRule.getStreamProcessor(i);
    }

    public long getLastProcessedPosition() {
        return this.lastProcessedPosition;
    }

    public DeploymentClient deployment() {
        return new DeploymentClient(this.environmentRule, this::forEachPartition, this.partitionCount);
    }

    public ProcessInstanceClient processInstance() {
        return new ProcessInstanceClient(this.environmentRule);
    }

    public DecisionEvaluationClient decision() {
        return new DecisionEvaluationClient(this.environmentRule);
    }

    public PublishMessageClient message() {
        return new PublishMessageClient(this.environmentRule, this.partitionCount);
    }

    public VariableClient variables() {
        return new VariableClient(this.environmentRule);
    }

    public JobActivationClient jobs() {
        return new JobActivationClient(this.environmentRule);
    }

    public JobClient job() {
        return new JobClient(this.environmentRule);
    }

    public IncidentClient incident() {
        return new IncidentClient(this.environmentRule);
    }

    public ResourceDeletionClient resourceDeletion() {
        return new ResourceDeletionClient(this.environmentRule);
    }

    public SignalClient signal() {
        return new SignalClient(this.environmentRule);
    }

    public Record<JobRecordValue> createJob(String str, String str2) {
        return createJob(str, str2, Collections.EMPTY_MAP);
    }

    public Record<JobRecordValue> createJob(String str, String str2, Map<String, Object> map) {
        deployment().withXmlResource(str2 + ".bpmn", Bpmn.createExecutableProcess(str2).startEvent("start").serviceTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType(str).done();
        }).endEvent("end").done()).deploy();
        long create = processInstance().ofBpmnProcessId(str2).withVariables(map).create();
        return (Record) RecordingExporter.jobRecords(JobIntent.CREATED).withType(str).filter(record -> {
            return record.getValue().getProcessInstanceKey() == create;
        }).getFirst();
    }

    public void writeRecords(RecordToWrite... recordToWriteArr) {
        this.environmentRule.writeBatch(recordToWriteArr);
    }

    public CommandResponseWriter getCommandResponseWriter() {
        return this.environmentRule.getCommandResponseWriter();
    }

    public void pauseProcessing(int i) {
        this.environmentRule.pauseProcessing(i);
    }

    public void resumeProcessing(int i) {
        this.environmentRule.resumeProcessing(i);
    }

    public Map<ZbColumnFamilies, Map<Object, Object>> collectState() {
        VersatileBlob versatileBlob = new VersatileBlob();
        VersatileBlob versatileBlob2 = new VersatileBlob();
        return (Map) Arrays.stream(ZbColumnFamilies.values()).collect(Collectors.toMap(Function.identity(), zbColumnFamilies -> {
            HashMap hashMap = new HashMap();
            getProcessingState().forEach(zbColumnFamilies, versatileBlob, versatileBlob2, (versatileBlob3, versatileBlob4) -> {
                hashMap.put(Arrays.toString(BufferUtil.cloneBuffer(versatileBlob3.getDirectBuffer()).byteArray()), MsgPackConverter.convertToJson(versatileBlob4.getDirectBuffer()));
            });
            return hashMap;
        }));
    }

    public void awaitProcessingOf(Record<?> record) {
        long position = record.getPosition();
        Awaitility.await(String.format("Await the %s.%s to be processed at position %d", record.getValueType(), record.getIntent(), Long.valueOf(position))).untilAsserted(() -> {
            ((AbstractLongAssert) Assertions.assertThat(getLastProcessedPosition()).describedAs("Last process position should be greater or equal to " + position, new Object[0])).isGreaterThanOrEqualTo(position);
        });
    }

    public boolean hasReachedEnd() {
        return ((Boolean) getStreamProcessor(1).hasProcessingReachedTheEnd().join()).booleanValue();
    }

    public EngineRule maxCommandsInBatch(int i) {
        this.environmentRule.maxCommandsInBatch(i);
        return this;
    }
}
