/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.util;

import io.camunda.zeebe.db.DbKey;
import io.camunda.zeebe.db.DbValue;
import io.camunda.zeebe.engine.processing.EngineProcessors;
import io.camunda.zeebe.engine.processing.deployment.distribute.DeploymentDistributor;
import io.camunda.zeebe.engine.processing.message.command.PartitionCommandSender;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandMessageHandler;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.ProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedEventImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.engine.state.ZbColumnFamilies;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.engine.util.ListLogStorage;
import io.camunda.zeebe.engine.util.RecordToWrite;
import io.camunda.zeebe.engine.util.StreamProcessorRule;
import io.camunda.zeebe.engine.util.client.DeploymentClient;
import io.camunda.zeebe.engine.util.client.IncidentClient;
import io.camunda.zeebe.engine.util.client.JobActivationClient;
import io.camunda.zeebe.engine.util.client.JobClient;
import io.camunda.zeebe.engine.util.client.ProcessInstanceClient;
import io.camunda.zeebe.engine.util.client.PublishMessageClient;
import io.camunda.zeebe.engine.util.client.VariableClient;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.builder.ServiceTaskBuilder;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.test.util.record.JobRecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import io.camunda.zeebe.util.buffer.BufferReader;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import io.camunda.zeebe.util.sched.ActorControl;
import io.camunda.zeebe.util.sched.clock.ControlledActorClock;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

public final class EngineRule
extends ExternalResource {
    private static final int PARTITION_ID = 1;
    private static final int REPROCESSING_TIMEOUT_SEC = 30;
    private static final RecordingExporter RECORDING_EXPORTER = new RecordingExporter();
    private final StreamProcessorRule environmentRule;
    private final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();
    private final int partitionCount;
    private Consumer<String> jobsAvailableCallback = type -> {};
    private Consumer<TypedRecord> onProcessedCallback = record -> {};
    private Consumer<LoggedEvent> onSkippedCallback = record -> {};
    private DeploymentDistributor deploymentDistributor = new DeploymentDistributionImpl();
    private final Int2ObjectHashMap<SubscriptionCommandMessageHandler> subscriptionHandlers = new Int2ObjectHashMap();
    private ExecutorService subscriptionHandlerExecutor;
    private final Map<Integer, ReprocessingCompletedListener> partitionReprocessingCompleteListeners = new Int2ObjectHashMap();
    private long lastProcessedPosition = -1L;

    private EngineRule(int partitionCount) {
        this(partitionCount, null);
    }

    private EngineRule(int partitionCount, ListLogStorage sharedStorage) {
        this.partitionCount = partitionCount;
        this.environmentRule = new StreamProcessorRule(1, partitionCount, DefaultZeebeDbFactory.defaultFactory(), sharedStorage);
    }

    public static EngineRule singlePartition() {
        return new EngineRule(1);
    }

    public static EngineRule multiplePartition(int partitionCount) {
        return new EngineRule(partitionCount);
    }

    public static EngineRule withSharedStorage(ListLogStorage listLogStorage) {
        return new EngineRule(1, listLogStorage);
    }

    public Statement apply(Statement base, Description description) {
        Statement statement = this.recordingExporterTestWatcher.apply(base, description);
        statement = super.apply(statement, description);
        return this.environmentRule.apply(statement, description);
    }

    protected void before() {
        this.subscriptionHandlerExecutor = Executors.newSingleThreadExecutor();
        this.startProcessors();
    }

    protected void after() {
        this.subscriptionHandlerExecutor.shutdown();
        this.subscriptionHandlers.clear();
        this.partitionReprocessingCompleteListeners.clear();
    }

    public void start() {
        this.startProcessors();
    }

    public void stop() {
        this.forEachPartition(this.environmentRule::closeStreamProcessor);
    }

    public EngineRule withJobsAvailableCallback(Consumer<String> callback) {
        this.jobsAvailableCallback = callback;
        return this;
    }

    public EngineRule withDeploymentDistributor(DeploymentDistributor deploymentDistributor) {
        this.deploymentDistributor = deploymentDistributor;
        return this;
    }

    public EngineRule withOnProcessedCallback(Consumer<TypedRecord> onProcessedCallback) {
        this.onProcessedCallback = this.onProcessedCallback.andThen(onProcessedCallback);
        return this;
    }

    public EngineRule withOnSkippedCallback(Consumer<LoggedEvent> onSkippedCallback) {
        this.onSkippedCallback = this.onSkippedCallback.andThen(onSkippedCallback);
        return this;
    }

    public EngineRule withStreamProcessorMode(StreamProcessorMode streamProcessorMode) {
        this.environmentRule.withStreamProcessorMode(streamProcessorMode);
        return this;
    }

    private void startProcessors() {
        DeploymentRecord deploymentRecord = new DeploymentRecord();
        UnsafeBuffer deploymentBuffer = new UnsafeBuffer(new byte[deploymentRecord.getLength()]);
        deploymentRecord.write((MutableDirectBuffer)deploymentBuffer, 0);
        this.forEachPartition(partitionId -> {
            ReprocessingCompletedListener reprocessingCompletedListener = new ReprocessingCompletedListener();
            this.partitionReprocessingCompleteListeners.put((Integer)partitionId, reprocessingCompletedListener);
            this.environmentRule.startTypedStreamProcessor((int)partitionId, processingContext -> EngineProcessors.createEngineProcessors((ProcessingContext)processingContext.listener(new StreamProcessorListener(){

                public void onProcessed(TypedRecord<?> processedCommand) {
                    EngineRule.this.lastProcessedPosition = processedCommand.getPosition();
                    EngineRule.this.onProcessedCallback.accept(processedCommand);
                }

                public void onSkipped(LoggedEvent skippedRecord) {
                    EngineRule.this.lastProcessedPosition = skippedRecord.getPosition();
                    EngineRule.this.onSkippedCallback.accept(skippedRecord);
                }
            }), (int)this.partitionCount, (SubscriptionCommandSender)new SubscriptionCommandSender(partitionId.intValue(), (PartitionCommandSender)new PartitionCommandSenderImpl()), (DeploymentDistributor)this.deploymentDistributor, (key, partition) -> {}, this.jobsAvailableCallback).withListener((StreamProcessorLifecycleAware)new ProcessingExporterTransistor()).withListener((StreamProcessorLifecycleAware)reprocessingCompletedListener));
            this.subscriptionHandlers.put(partitionId, (Object)new SubscriptionCommandMessageHandler(this.subscriptionHandlerExecutor::submit, this.environmentRule::getLogStreamRecordWriter));
        });
    }

    public void awaitReprocessingCompleted() {
        this.partitionReprocessingCompleteListeners.values().forEach(ReprocessingCompletedListener::awaitReprocessingComplete);
    }

    public void forEachPartition(Consumer<Integer> partitionIdConsumer) {
        int partitionId = 1;
        for (int i = 0; i < this.partitionCount; ++i) {
            partitionIdConsumer.accept(partitionId++);
        }
    }

    public void increaseTime(Duration duration) {
        this.environmentRule.getClock().addTime(duration);
    }

    public void reprocess() {
        this.forEachPartition(partitionId -> {
            try {
                this.environmentRule.closeStreamProcessor((int)partitionId);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        int lastSize = RecordingExporter.getRecords().size();
        RecordingExporter.reset();
        this.startProcessors();
        TestUtil.waitUntil(() -> RecordingExporter.getRecords().size() >= lastSize, (String)"Failed to reprocess all events, only re-exported %d but expected %d", (Object[])new Object[]{RecordingExporter.getRecords().size(), lastSize});
    }

    public List<Integer> getPartitionIds() {
        return IntStream.range(1, 1 + this.partitionCount).boxed().collect(Collectors.toList());
    }

    public ControlledActorClock getClock() {
        return this.environmentRule.getClock();
    }

    public MutableZeebeState getZeebeState() {
        return this.environmentRule.getZeebeState();
    }

    public StreamProcessor getStreamProcessor(int partitionId) {
        return this.environmentRule.getStreamProcessor(partitionId);
    }

    public long getLastWrittenPosition(int partitionId) {
        return this.environmentRule.getLastWrittenPosition(partitionId);
    }

    public long getLastProcessedPosition() {
        return this.lastProcessedPosition;
    }

    public DeploymentClient deployment() {
        return new DeploymentClient(this.environmentRule, this::forEachPartition);
    }

    public ProcessInstanceClient processInstance() {
        return new ProcessInstanceClient(this.environmentRule);
    }

    public PublishMessageClient message() {
        return new PublishMessageClient(this.environmentRule, this.partitionCount);
    }

    public VariableClient variables() {
        return new VariableClient(this.environmentRule);
    }

    public JobActivationClient jobs() {
        return new JobActivationClient(this.environmentRule);
    }

    public JobClient job() {
        return new JobClient(this.environmentRule);
    }

    public IncidentClient incident() {
        return new IncidentClient(this.environmentRule);
    }

    public Record<JobRecordValue> createJob(String type, String processId) {
        this.deployment().withXmlResource(processId, Bpmn.createExecutableProcess((String)processId).startEvent("start").serviceTask("task", b -> ((ServiceTaskBuilder)b.zeebeJobType(type)).done()).endEvent("end").done()).deploy();
        long instanceKey = this.processInstance().ofBpmnProcessId(processId).create();
        return (Record)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withType(type).filter(r -> ((JobRecordValue)r.getValue()).getProcessInstanceKey() == instanceKey)).getFirst();
    }

    public void writeRecords(RecordToWrite ... records) {
        this.environmentRule.writeBatch(records);
    }

    public CommandResponseWriter getCommandResponseWriter() {
        return this.environmentRule.getCommandResponseWriter();
    }

    public void pauseProcessing(int partitionId) {
        this.environmentRule.pauseProcessing(partitionId);
    }

    public void resumeProcessing(int partitionId) {
        this.environmentRule.resumeProcessing(partitionId);
    }

    public Map<ZbColumnFamilies, Map<Object, Object>> collectState() {
        VersatileBlob keyInstance = new VersatileBlob();
        VersatileBlob valueInstance = new VersatileBlob();
        return Arrays.stream(ZbColumnFamilies.values()).collect(Collectors.toMap(Function.identity(), columnFamily -> {
            HashMap entries = new HashMap();
            ((ZeebeDbState)this.getZeebeState()).forEach(columnFamily, (DbKey)keyInstance, (DbValue)valueInstance, (key, value) -> entries.put(Arrays.toString(BufferUtil.cloneBuffer((DirectBuffer)key.getDirectBuffer()).byteArray()), MsgPackConverter.convertToJson((DirectBuffer)value.getDirectBuffer())));
            return entries;
        }));
    }

    public void awaitProcessingOf(Record<?> record) {
        long recordPosition = record.getPosition();
        Awaitility.await((String)String.format("Await the %s.%s to be processed at position %d", record.getValueType(), record.getIntent(), recordPosition)).untilAsserted(() -> ((AbstractLongAssert)Assertions.assertThat((long)this.getLastProcessedPosition()).describedAs("Last process position should be greater or equal to " + recordPosition, new Object[0])).isGreaterThanOrEqualTo(recordPosition));
    }

    private class PartitionCommandSenderImpl
    implements PartitionCommandSender {
        private PartitionCommandSenderImpl() {
        }

        public boolean sendCommand(int receiverPartitionId, BufferWriter command) {
            byte[] bytes = new byte[command.getLength()];
            UnsafeBuffer commandBuffer = new UnsafeBuffer(bytes);
            command.write((MutableDirectBuffer)commandBuffer, 0);
            ((SubscriptionCommandMessageHandler)EngineRule.this.subscriptionHandlers.get(receiverPartitionId)).apply(bytes);
            return true;
        }
    }

    private final class DeploymentDistributionImpl
    implements DeploymentDistributor {
        private DeploymentDistributionImpl() {
        }

        public ActorFuture<Void> pushDeploymentToPartition(long key, int partitionId, DirectBuffer deploymentBuffer) {
            DeploymentRecord deploymentRecord = new DeploymentRecord();
            deploymentRecord.wrap(deploymentBuffer);
            new Thread(() -> EngineRule.this.environmentRule.writeCommandOnPartition(partitionId, key, (Intent)DeploymentIntent.DISTRIBUTE, (UnpackedObject)deploymentRecord)).start();
            return CompletableActorFuture.completed(null);
        }
    }

    private final class ReprocessingCompletedListener
    implements StreamProcessorLifecycleAware {
        private final ActorFuture<Void> reprocessingComplete = new CompletableActorFuture();

        private ReprocessingCompletedListener() {
        }

        public void onRecovered(ReadonlyProcessingContext context) {
            this.reprocessingComplete.complete(null);
        }

        public void awaitReprocessingComplete() {
            this.reprocessingComplete.join(30L, TimeUnit.SECONDS);
        }
    }

    private static class ProcessingExporterTransistor
    implements StreamProcessorLifecycleAware {
        private final RecordValues recordValues = new RecordValues();
        private final RecordMetadata metadata = new RecordMetadata();
        private LogStreamReader logStreamReader;
        private TypedEventImpl typedEvent;

        private ProcessingExporterTransistor() {
        }

        public void onRecovered(ReadonlyProcessingContext context) {
            int partitionId = context.getLogStream().getPartitionId();
            this.typedEvent = new TypedEventImpl(partitionId);
            ActorControl actor = context.getActor();
            LogStream logStream = context.getLogStream();
            logStream.registerRecordAvailableListener(() -> actor.run(this::onNewEventCommitted));
            logStream.newLogStreamReader().onComplete((reader, throwable) -> {
                if (throwable == null) {
                    this.logStreamReader = reader;
                    this.onNewEventCommitted();
                }
            });
        }

        private void onNewEventCommitted() {
            if (this.logStreamReader == null) {
                return;
            }
            while (this.logStreamReader.hasNext()) {
                LoggedEvent rawEvent = (LoggedEvent)this.logStreamReader.next();
                this.metadata.reset();
                rawEvent.readMetadata((BufferReader)this.metadata);
                UnifiedRecordValue recordValue = this.recordValues.readRecordValue(rawEvent, this.metadata.getValueType());
                this.typedEvent.wrap(rawEvent, this.metadata, recordValue);
                RECORDING_EXPORTER.export((Record)this.typedEvent);
            }
        }
    }

    private static final class VersatileBlob
    implements DbKey,
    DbValue {
        private final DirectBuffer genericBuffer = new UnsafeBuffer(0L, 0);

        private VersatileBlob() {
        }

        public void wrap(DirectBuffer buffer, int offset, int length) {
            this.genericBuffer.wrap(buffer, offset, length);
        }

        public int getLength() {
            return this.genericBuffer.capacity();
        }

        public void write(MutableDirectBuffer buffer, int offset) {
            buffer.putBytes(offset, this.genericBuffer, 0, this.genericBuffer.capacity());
        }

        public DirectBuffer getDirectBuffer() {
            return this.genericBuffer;
        }
    }
}

