/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.util;

import io.camunda.zeebe.db.DbKey;
import io.camunda.zeebe.db.DbValue;
import io.camunda.zeebe.engine.processing.EngineProcessors;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.JobStreamer;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorContext;
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.engine.state.ProcessingDbState;
import io.camunda.zeebe.engine.state.immutable.ProcessingState;
import io.camunda.zeebe.engine.util.ProcessingExporterTransistor;
import io.camunda.zeebe.engine.util.RecordToWrite;
import io.camunda.zeebe.engine.util.StreamProcessorRule;
import io.camunda.zeebe.engine.util.TestInterPartitionCommandSender;
import io.camunda.zeebe.engine.util.client.DecisionEvaluationClient;
import io.camunda.zeebe.engine.util.client.DeploymentClient;
import io.camunda.zeebe.engine.util.client.IncidentClient;
import io.camunda.zeebe.engine.util.client.JobActivationClient;
import io.camunda.zeebe.engine.util.client.JobClient;
import io.camunda.zeebe.engine.util.client.ProcessInstanceClient;
import io.camunda.zeebe.engine.util.client.PublishMessageClient;
import io.camunda.zeebe.engine.util.client.ResourceDeletionClient;
import io.camunda.zeebe.engine.util.client.SignalClient;
import io.camunda.zeebe.engine.util.client.UserTaskClient;
import io.camunda.zeebe.engine.util.client.VariableClient;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.util.ListLogStorage;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.builder.ServiceTaskBuilder;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import io.camunda.zeebe.scheduler.clock.ControlledActorClock;
import io.camunda.zeebe.stream.api.CommandResponseWriter;
import io.camunda.zeebe.stream.api.InterPartitionCommandSender;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import io.camunda.zeebe.stream.impl.StreamProcessor;
import io.camunda.zeebe.stream.impl.StreamProcessorListener;
import io.camunda.zeebe.stream.impl.StreamProcessorMode;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.test.util.record.JobRecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import io.camunda.zeebe.util.FeatureFlags;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

public final class EngineRule
extends ExternalResource {
    private static final int PARTITION_ID = 1;
    private final StreamProcessorRule environmentRule;
    private final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();
    private final int partitionCount;
    private Consumer<TypedRecord> onProcessedCallback = record -> {};
    private Consumer<LoggedEvent> onSkippedCallback = record -> {};
    private long lastProcessedPosition = -1L;
    private JobStreamer jobStreamer = JobStreamer.noop();
    private FeatureFlags featureFlags = FeatureFlags.createDefaultForTests();
    private ArrayList<TestInterPartitionCommandSender> interPartitionCommandSenders;

    private EngineRule(int partitionCount) {
        this(partitionCount, null);
    }

    private EngineRule(int partitionCount, ListLogStorage sharedStorage) {
        this.partitionCount = partitionCount;
        this.environmentRule = new StreamProcessorRule(1, partitionCount, DefaultZeebeDbFactory.defaultFactory(), sharedStorage);
    }

    public static EngineRule singlePartition() {
        return new EngineRule(1);
    }

    public static EngineRule multiplePartition(int partitionCount) {
        return new EngineRule(partitionCount);
    }

    public static EngineRule withSharedStorage(ListLogStorage listLogStorage) {
        return new EngineRule(1, listLogStorage);
    }

    public Statement apply(Statement base, Description description) {
        Statement statement = this.recordingExporterTestWatcher.apply(base, description);
        statement = super.apply(statement, description);
        return this.environmentRule.apply(statement, description);
    }

    protected void before() {
        this.startProcessors();
    }

    public void start() {
        this.startProcessors();
    }

    public void stop() {
        this.forEachPartition(this.environmentRule::closeStreamProcessor);
    }

    public EngineRule withJobStreamer(JobStreamer jobStreamer) {
        this.jobStreamer = jobStreamer;
        return this;
    }

    public EngineRule withFeatureFlags(FeatureFlags featureFlags) {
        this.featureFlags = featureFlags;
        return this;
    }

    public EngineRule withOnProcessedCallback(Consumer<TypedRecord> onProcessedCallback) {
        this.onProcessedCallback = this.onProcessedCallback.andThen(onProcessedCallback);
        return this;
    }

    public EngineRule withOnSkippedCallback(Consumer<LoggedEvent> onSkippedCallback) {
        this.onSkippedCallback = this.onSkippedCallback.andThen(onSkippedCallback);
        return this;
    }

    public EngineRule withStreamProcessorMode(StreamProcessorMode streamProcessorMode) {
        this.environmentRule.withStreamProcessorMode(streamProcessorMode);
        return this;
    }

    private void startProcessors() {
        this.interPartitionCommandSenders = new ArrayList();
        this.forEachPartition(partitionId -> {
            TestInterPartitionCommandSender interPartitionCommandSender = new TestInterPartitionCommandSender(this.environmentRule::newLogStreamWriter);
            this.interPartitionCommandSenders.add(interPartitionCommandSender);
            this.environmentRule.startTypedStreamProcessor((int)partitionId, recordProcessorContext -> EngineProcessors.createEngineProcessors((TypedRecordProcessorContext)recordProcessorContext, (int)this.partitionCount, (SubscriptionCommandSender)new SubscriptionCommandSender(partitionId.intValue(), (InterPartitionCommandSender)interPartitionCommandSender), (InterPartitionCommandSender)interPartitionCommandSender, (FeatureFlags)this.featureFlags, (JobStreamer)this.jobStreamer).withListener((StreamProcessorLifecycleAware)new ProcessingExporterTransistor(this.environmentRule.getLogStream((int)partitionId))), Optional.of(new StreamProcessorListener(){

                public void onProcessed(TypedRecord<?> processedCommand) {
                    EngineRule.this.lastProcessedPosition = processedCommand.getPosition();
                    EngineRule.this.onProcessedCallback.accept(processedCommand);
                }

                public void onSkipped(LoggedEvent skippedRecord) {
                    EngineRule.this.lastProcessedPosition = skippedRecord.getPosition();
                    EngineRule.this.onSkippedCallback.accept(skippedRecord);
                }
            }));
        });
        this.interPartitionCommandSenders.forEach(s -> s.initializeWriters(this.partitionCount));
    }

    public void forEachPartition(Consumer<Integer> partitionIdConsumer) {
        int partitionId = 1;
        for (int i = 0; i < this.partitionCount; ++i) {
            partitionIdConsumer.accept(partitionId++);
        }
    }

    public void increaseTime(Duration duration) {
        StreamProcessor streamProcessor = this.environmentRule.getStreamProcessor(1);
        if (streamProcessor.getCurrentPhase().join() == StreamProcessor.Phase.PROCESSING) {
            Awaitility.await((String)"Expect that engine reaches the end of the log before increasing the time").until(this::hasReachedEnd);
        }
        this.environmentRule.getClock().addTime(duration);
    }

    public void reprocess() {
        this.forEachPartition(partitionId -> {
            try {
                this.environmentRule.closeStreamProcessor((int)partitionId);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        int lastSize = RecordingExporter.getRecords().size();
        RecordingExporter.reset();
        this.startProcessors();
        TestUtil.waitUntil(() -> RecordingExporter.getRecords().size() >= lastSize, (String)"Failed to reprocess all events, only re-exported %d but expected %d", (Object[])new Object[]{RecordingExporter.getRecords().size(), lastSize});
    }

    public List<Integer> getPartitionIds() {
        return IntStream.range(1, 1 + this.partitionCount).boxed().collect(Collectors.toList());
    }

    public ControlledActorClock getClock() {
        return this.environmentRule.getClock();
    }

    public ProcessingState getProcessingState() {
        return this.environmentRule.getProcessingState();
    }

    public StreamProcessor getStreamProcessor(int partitionId) {
        return this.environmentRule.getStreamProcessor(partitionId);
    }

    public long getLastProcessedPosition() {
        return this.lastProcessedPosition;
    }

    public DeploymentClient deployment() {
        return new DeploymentClient(this.environmentRule, this::forEachPartition, this.partitionCount);
    }

    public ProcessInstanceClient processInstance() {
        return new ProcessInstanceClient(this.environmentRule);
    }

    public DecisionEvaluationClient decision() {
        return new DecisionEvaluationClient(this.environmentRule);
    }

    public PublishMessageClient message() {
        return new PublishMessageClient(this.environmentRule, this.partitionCount);
    }

    public VariableClient variables() {
        return new VariableClient(this.environmentRule);
    }

    public JobActivationClient jobs() {
        return new JobActivationClient(this.environmentRule);
    }

    public JobClient job() {
        return new JobClient(this.environmentRule);
    }

    public IncidentClient incident() {
        return new IncidentClient(this.environmentRule);
    }

    public ResourceDeletionClient resourceDeletion() {
        return new ResourceDeletionClient(this.environmentRule);
    }

    public SignalClient signal() {
        return new SignalClient(this.environmentRule);
    }

    public UserTaskClient userTask() {
        return new UserTaskClient(this.environmentRule);
    }

    public Record<JobRecordValue> createJob(String type, String processId) {
        return this.createJob(type, processId, Collections.emptyMap());
    }

    public Record<JobRecordValue> createJob(String type, String processId, Map<String, Object> variables) {
        return this.createJob(type, processId, variables, "<default>");
    }

    public Record<JobRecordValue> createJob(String type, String processId, Map<String, Object> variables, String tenantId) {
        this.deployment().withXmlResource(processId + ".bpmn", Bpmn.createExecutableProcess((String)processId).startEvent("start").serviceTask("task", b -> ((ServiceTaskBuilder)b.zeebeJobType(type)).done()).endEvent("end").done()).withTenantId(tenantId).deploy();
        long instanceKey = this.processInstance().ofBpmnProcessId(processId).withVariables(variables).withTenantId(tenantId).create();
        return (Record)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withType(type).withTenantId(tenantId).filter(r -> ((JobRecordValue)r.getValue()).getProcessInstanceKey() == instanceKey)).getFirst();
    }

    public void writeRecords(RecordToWrite ... records) {
        this.environmentRule.writeBatch(records);
    }

    public CommandResponseWriter getCommandResponseWriter() {
        return this.environmentRule.getCommandResponseWriter();
    }

    public void pauseProcessing(int partitionId) {
        this.environmentRule.pauseProcessing(partitionId);
    }

    public void resumeProcessing(int partitionId) {
        this.environmentRule.resumeProcessing(partitionId);
    }

    public Map<ZbColumnFamilies, Map<Object, Object>> collectState() {
        VersatileBlob keyInstance = new VersatileBlob();
        VersatileBlob valueInstance = new VersatileBlob();
        return Arrays.stream(ZbColumnFamilies.values()).collect(Collectors.toMap(Function.identity(), columnFamily -> {
            HashMap entries = new HashMap();
            ((ProcessingDbState)this.getProcessingState()).forEach(columnFamily, (DbKey)keyInstance, (DbValue)valueInstance, (key, value) -> entries.put(Arrays.toString(BufferUtil.cloneBuffer((DirectBuffer)key.getDirectBuffer()).byteArray()), MsgPackConverter.convertToJson((DirectBuffer)value.getDirectBuffer())));
            return entries;
        }));
    }

    public void awaitProcessingOf(Record<?> record) {
        long recordPosition = record.getPosition();
        Awaitility.await((String)String.format("Await the %s.%s to be processed at position %d", record.getValueType(), record.getIntent(), recordPosition)).untilAsserted(() -> ((AbstractLongAssert)Assertions.assertThat((long)this.getLastProcessedPosition()).describedAs("Last process position should be greater or equal to " + recordPosition, new Object[0])).isGreaterThanOrEqualTo(recordPosition));
    }

    public boolean hasReachedEnd() {
        return (Boolean)this.getStreamProcessor(1).hasProcessingReachedTheEnd().join();
    }

    public EngineRule maxCommandsInBatch(int maxCommandsInBatch) {
        this.environmentRule.maxCommandsInBatch(maxCommandsInBatch);
        return this;
    }

    public void interceptInterPartitionCommands(TestInterPartitionCommandSender.CommandInterceptor interceptor) {
        if (this.interPartitionCommandSenders == null) {
            throw new IllegalStateException("Cannot intercept inter-partition commands before the engine is started");
        }
        this.interPartitionCommandSenders.forEach(sender -> sender.intercept(interceptor));
    }

    private static final class VersatileBlob
    implements DbKey,
    DbValue {
        private final DirectBuffer genericBuffer = new UnsafeBuffer(0L, 0);

        private VersatileBlob() {
        }

        public void wrap(DirectBuffer buffer, int offset, int length) {
            this.genericBuffer.wrap(buffer, offset, length);
        }

        public int getLength() {
            return this.genericBuffer.capacity();
        }

        public void write(MutableDirectBuffer buffer, int offset) {
            buffer.putBytes(offset, this.genericBuffer, 0, this.genericBuffer.capacity());
        }

        public DirectBuffer getDirectBuffer() {
            return this.genericBuffer;
        }
    }
}

