package io.zeebe.broker.util;

import io.zeebe.broker.exporter.stream.ExporterRecord;
import io.zeebe.broker.incident.data.IncidentRecord;
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.state.StateStorageFactory;
import io.zeebe.broker.subscription.message.data.MessageSubscriptionRecord;
import io.zeebe.broker.subscription.message.data.WorkflowInstanceSubscriptionRecord;
import io.zeebe.broker.topic.Records;
import io.zeebe.broker.topic.StreamProcessorControl;
import io.zeebe.logstreams.LogStreams;
import io.zeebe.logstreams.impl.service.StreamProcessorService;
import io.zeebe.logstreams.impl.snapshot.fs.FsSnapshotController;
import io.zeebe.logstreams.log.BufferedLogStreamReader;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamRecordWriter;
import io.zeebe.logstreams.log.LogStreamWriterImpl;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.processor.EventProcessor;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.processor.StreamProcessorContext;
import io.zeebe.logstreams.processor.StreamProcessorController;
import io.zeebe.logstreams.spi.SnapshotController;
import io.zeebe.logstreams.spi.SnapshotStorage;
import io.zeebe.logstreams.spi.SnapshotSupport;
import io.zeebe.logstreams.state.StateController;
import io.zeebe.logstreams.state.StateSnapshotController;
import io.zeebe.logstreams.state.StateStorage;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.zeebe.protocol.impl.record.value.job.JobBatchRecord;
import io.zeebe.protocol.impl.record.value.job.JobRecord;
import io.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.raft.event.RaftConfigurationEvent;
import io.zeebe.servicecontainer.ServiceContainer;
import io.zeebe.test.util.AutoCloseableRule;
import io.zeebe.test.util.TestUtil;
import io.zeebe.util.LangUtil;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.ActorControl;
import io.zeebe.util.sched.ActorScheduler;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/* loaded from: input_file:io/zeebe/broker/util/TestStreams.class */
public class TestStreams {
    protected static final Map<Class<?>, ValueType> VALUE_TYPES = new HashMap();
    protected final File storageDirectory;
    protected final AutoCloseableRule closeables;
    private final ServiceContainer serviceContainer;
    protected Map<String, LogStream> managedLogs = new HashMap();
    protected ActorScheduler actorScheduler;
    protected StateStorageFactory stateStorageFactory;
    protected SnapshotStorage snapshotStorage;
    protected StateStorage stateStorage;

    /* loaded from: input_file:io/zeebe/broker/util/TestStreams$FluentLogWriter.class */
    public static class FluentLogWriter {
        protected UnpackedObject value;
        protected LogStream logStream;
        protected RecordMetadata metadata = new RecordMetadata();
        protected long key = -1;

        public FluentLogWriter(LogStream logStream) {
            this.logStream = logStream;
            this.metadata.protocolVersion(1);
        }

        public FluentLogWriter metadata(Consumer<RecordMetadata> consumer) {
            consumer.accept(this.metadata);
            return this;
        }

        public FluentLogWriter intent(Intent intent) {
            this.metadata.intent(intent);
            return this;
        }

        public FluentLogWriter recordType(RecordType recordType) {
            this.metadata.recordType(recordType);
            return this;
        }

        public FluentLogWriter key(long j) {
            this.key = j;
            return this;
        }

        public FluentLogWriter event(UnpackedObject unpackedObject) {
            ValueType valueType = TestStreams.VALUE_TYPES.get(unpackedObject.getClass());
            if (valueType == null) {
                throw new RuntimeException("No event type registered for value " + unpackedObject.getClass());
            }
            this.metadata.valueType(valueType);
            this.value = unpackedObject;
            return this;
        }

        public long write() {
            LogStreamWriterImpl logStreamWriterImpl = new LogStreamWriterImpl(this.logStream);
            if (this.key >= 0) {
                logStreamWriterImpl.key(this.key);
            } else {
                logStreamWriterImpl.positionAsKey();
            }
            logStreamWriterImpl.metadataWriter(this.metadata);
            logStreamWriterImpl.valueWriter(this.value);
            return ((Long) TestUtil.doRepeatedly(() -> {
                return Long.valueOf(logStreamWriterImpl.tryWrite());
            }).until(l -> {
                return Boolean.valueOf(l.longValue() >= 0);
            })).longValue();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/zeebe/broker/util/TestStreams$StreamProcessorControlImpl.class */
    public class StreamProcessorControlImpl implements StreamProcessorControl, AutoCloseable {
        private final Supplier<StreamProcessor> factory;
        private final int streamProcessorId;
        private final LogStream stream;
        protected SuspendableStreamProcessor currentStreamProcessor;
        protected StreamProcessorController currentController;
        protected StreamProcessorService currentStreamProcessorService;
        protected SnapshotController currentSnapshotController;

        public StreamProcessorControlImpl(LogStream logStream, Supplier<StreamProcessor> supplier, int i) {
            this.stream = logStream;
            this.factory = supplier;
            this.streamProcessorId = i;
        }

        @Override // io.zeebe.broker.topic.StreamProcessorControl
        public void purgeSnapshot() {
            try {
                this.currentSnapshotController.purgeAll();
            } catch (Exception e) {
                LangUtil.rethrowUnchecked(e);
            }
        }

        @Override // io.zeebe.broker.topic.StreamProcessorControl
        public void unblock() {
            this.currentStreamProcessor.resume();
        }

        @Override // io.zeebe.broker.topic.StreamProcessorControl
        public boolean isBlocked() {
            return this.currentController.isSuspended();
        }

        @Override // io.zeebe.broker.topic.StreamProcessorControl
        public void blockAfterEvent(Predicate<LoggedEvent> predicate) {
            ensureStreamProcessorBuilt();
            this.currentStreamProcessor.blockAfterEvent(predicate);
        }

        @Override // io.zeebe.broker.topic.StreamProcessorControl
        public void blockAfterJobEvent(Predicate<TypedRecord<JobRecord>> predicate) {
            blockAfterEvent(loggedEvent -> {
                return Records.isJobRecord(loggedEvent) && predicate.test(CopiedTypedEvent.toTypedEvent(loggedEvent, JobRecord.class));
            });
        }

        @Override // io.zeebe.broker.topic.StreamProcessorControl
        public void blockAfterDeploymentEvent(Predicate<TypedRecord<DeploymentRecord>> predicate) {
            blockAfterEvent(loggedEvent -> {
                return Records.isDeploymentRecord(loggedEvent) && predicate.test(CopiedTypedEvent.toTypedEvent(loggedEvent, DeploymentRecord.class));
            });
        }

        @Override // io.zeebe.broker.topic.StreamProcessorControl
        public void blockAfterWorkflowInstanceRecord(Predicate<TypedRecord<WorkflowInstanceRecord>> predicate) {
            blockAfterEvent(loggedEvent -> {
                return Records.isWorkflowInstanceRecord(loggedEvent) && predicate.test(CopiedTypedEvent.toTypedEvent(loggedEvent, WorkflowInstanceRecord.class));
            });
        }

        @Override // io.zeebe.broker.topic.StreamProcessorControl
        public void blockAfterIncidentEvent(Predicate<TypedRecord<IncidentRecord>> predicate) {
            blockAfterEvent(loggedEvent -> {
                return Records.isIncidentRecord(loggedEvent) && predicate.test(CopiedTypedEvent.toTypedEvent(loggedEvent, IncidentRecord.class));
            });
        }

        @Override // io.zeebe.broker.topic.StreamProcessorControl
        public void blockAfterMessageEvent(Predicate<TypedRecord<MessageRecord>> predicate) {
            blockAfterEvent(loggedEvent -> {
                return Records.isMessageRecord(loggedEvent) && predicate.test(CopiedTypedEvent.toTypedEvent(loggedEvent, MessageRecord.class));
            });
        }

        @Override // io.zeebe.broker.topic.StreamProcessorControl
        public void blockAfterMessageSubscriptionEvent(Predicate<TypedRecord<MessageSubscriptionRecord>> predicate) {
            blockAfterEvent(loggedEvent -> {
                return Records.isMessageSubscriptionRecord(loggedEvent) && predicate.test(CopiedTypedEvent.toTypedEvent(loggedEvent, MessageSubscriptionRecord.class));
            });
        }

        @Override // io.zeebe.broker.topic.StreamProcessorControl
        public void blockAfterWorkflowInstanceSubscriptionEvent(Predicate<TypedRecord<WorkflowInstanceSubscriptionRecord>> predicate) {
            blockAfterEvent(loggedEvent -> {
                return Records.isWorkflowInstanceSubscriptionRecord(loggedEvent) && predicate.test(CopiedTypedEvent.toTypedEvent(loggedEvent, WorkflowInstanceSubscriptionRecord.class));
            });
        }

        @Override // io.zeebe.broker.topic.StreamProcessorControl, java.lang.AutoCloseable
        public void close() {
            if (this.currentController != null && this.currentController.isOpened()) {
                this.currentStreamProcessorService.close();
            }
            this.currentStreamProcessorService = null;
            this.currentController = null;
            this.currentStreamProcessor = null;
        }

        @Override // io.zeebe.broker.topic.StreamProcessorControl
        public void start() {
            this.currentStreamProcessorService = buildStreamProcessorController();
            this.currentController = this.currentStreamProcessorService.getController();
        }

        @Override // io.zeebe.broker.topic.StreamProcessorControl
        public void restart() {
            close();
            start();
        }

        private void ensureStreamProcessorBuilt() {
            if (this.currentStreamProcessor == null) {
                this.currentStreamProcessor = new SuspendableStreamProcessor(this.factory.get());
            }
        }

        private StreamProcessorService buildStreamProcessorController() {
            ensureStreamProcessorBuilt();
            String simpleName = this.currentStreamProcessor.wrappedProcessor.getClass().getSimpleName();
            if (this.currentStreamProcessor.getStateController() != null) {
                this.currentSnapshotController = new StateSnapshotController(this.currentStreamProcessor.getStateController(), TestStreams.this.getStateStorageFactory().create(this.streamProcessorId, simpleName));
            } else {
                this.currentSnapshotController = new FsSnapshotController(TestStreams.this.getSnapshotStorage(), simpleName, this.currentStreamProcessor.getStateResource());
            }
            return (StreamProcessorService) LogStreams.createStreamProcessor(simpleName, this.streamProcessorId, this.currentStreamProcessor).logStream(this.stream).snapshotController(this.currentSnapshotController).actorScheduler(TestStreams.this.actorScheduler).serviceContainer(TestStreams.this.serviceContainer).build().join();
        }
    }

    /* loaded from: input_file:io/zeebe/broker/util/TestStreams$SuspendableStreamProcessor.class */
    public static class SuspendableStreamProcessor implements StreamProcessor {
        protected final StreamProcessor wrappedProcessor;
        protected AtomicReference<Predicate<LoggedEvent>> blockAfterCondition = new AtomicReference<>(null);
        protected boolean blockAfterCurrentEvent;
        private StreamProcessorContext context;

        public SuspendableStreamProcessor(StreamProcessor streamProcessor) {
            this.wrappedProcessor = streamProcessor;
        }

        public SnapshotSupport getStateResource() {
            return this.wrappedProcessor.getStateResource();
        }

        public StateController getStateController() {
            return this.wrappedProcessor.getStateController();
        }

        public void resume() {
            this.context.getActorControl().call(() -> {
                this.context.resumeController();
            });
        }

        public void blockAfterEvent(Predicate<LoggedEvent> predicate) {
            this.blockAfterCondition.set(predicate);
        }

        public EventProcessor onEvent(LoggedEvent loggedEvent) {
            Predicate<LoggedEvent> predicate = this.blockAfterCondition.get();
            this.blockAfterCurrentEvent = predicate != null && predicate.test(loggedEvent);
            final EventProcessor onEvent = this.wrappedProcessor.onEvent(loggedEvent);
            return new EventProcessor() { // from class: io.zeebe.broker.util.TestStreams.SuspendableStreamProcessor.1
                public void processEvent() {
                    if (onEvent != null) {
                        onEvent.processEvent();
                    }
                }

                public boolean executeSideEffects() {
                    if (onEvent != null) {
                        return onEvent.executeSideEffects();
                    }
                    return true;
                }

                public long writeEvent(LogStreamRecordWriter logStreamRecordWriter) {
                    if (onEvent != null) {
                        return onEvent.writeEvent(logStreamRecordWriter);
                    }
                    return 0L;
                }

                public void updateState() {
                    if (onEvent != null) {
                        onEvent.updateState();
                    }
                    if (SuspendableStreamProcessor.this.blockAfterCurrentEvent) {
                        SuspendableStreamProcessor.this.blockAfterCurrentEvent = false;
                        SuspendableStreamProcessor.this.context.suspendController();
                    }
                }
            };
        }

        public void onOpen(StreamProcessorContext streamProcessorContext) {
            this.context = streamProcessorContext;
            this.wrappedProcessor.onOpen(this.context);
        }

        public void onRecovered() {
            this.wrappedProcessor.onRecovered();
        }

        public void onClose() {
            this.wrappedProcessor.onClose();
        }
    }

    public TestStreams(File file, AutoCloseableRule autoCloseableRule, ServiceContainer serviceContainer, ActorScheduler actorScheduler) {
        this.storageDirectory = file;
        this.closeables = autoCloseableRule;
        this.serviceContainer = serviceContainer;
        this.actorScheduler = actorScheduler;
    }

    public LogStream createLogStream(String str) {
        return createLogStream(str, 0);
    }

    public LogStream createLogStream(String str, int i) {
        final LogStream logStream = (LogStream) LogStreams.createFsLogStream(i).logRootPath(this.storageDirectory.getAbsolutePath()).serviceContainer(this.serviceContainer).logName(str).deleteOnClose(true).build().join();
        this.actorScheduler.submitActor(new Actor() { // from class: io.zeebe.broker.util.TestStreams.1
            protected void onActorStarting() {
                ActorControl actorControl = this.actor;
                LogStream logStream2 = logStream;
                logStream.registerOnAppendCondition(actorControl.onCondition("on-append", () -> {
                    logStream2.setCommitPosition(Long.MAX_VALUE);
                }));
            }
        }).join();
        logStream.openAppender().join();
        this.managedLogs.put(str, logStream);
        this.closeables.manage(logStream);
        return logStream;
    }

    public LogStream getLogStream(String str) {
        return this.managedLogs.get(str);
    }

    public void truncate(String str, long j) {
        LogStream logStream = getLogStream(str);
        try {
            BufferedLogStreamReader bufferedLogStreamReader = new BufferedLogStreamReader(logStream);
            Throwable th = null;
            try {
                try {
                    logStream.closeAppender().get();
                    bufferedLogStreamReader.seek(j + 1);
                    logStream.setCommitPosition(j);
                    if (bufferedLogStreamReader.hasNext()) {
                        logStream.truncate(((LoggedEvent) bufferedLogStreamReader.next()).getPosition());
                    }
                    logStream.setCommitPosition(Long.MAX_VALUE);
                    logStream.openAppender().get();
                    if (bufferedLogStreamReader != null) {
                        if (0 != 0) {
                            try {
                                bufferedLogStreamReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            bufferedLogStreamReader.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException("Could not truncate log stream " + str, e);
        }
    }

    public Stream<LoggedEvent> events(String str) {
        BufferedLogStreamReader bufferedLogStreamReader = new BufferedLogStreamReader(this.managedLogs.get(str));
        this.closeables.manage(bufferedLogStreamReader);
        bufferedLogStreamReader.seekToFirstEvent();
        Iterable iterable = () -> {
            return bufferedLogStreamReader;
        };
        return StreamSupport.stream(iterable.spliterator(), false);
    }

    public FluentLogWriter newRecord(String str) {
        return new FluentLogWriter(getLogStream(str));
    }

    protected SnapshotStorage getSnapshotStorage() {
        if (this.snapshotStorage == null) {
            this.snapshotStorage = LogStreams.createFsSnapshotStore(this.storageDirectory.getAbsolutePath()).build();
        }
        return this.snapshotStorage;
    }

    protected StateStorageFactory getStateStorageFactory() {
        if (this.stateStorageFactory == null) {
            File file = new File(this.storageDirectory, "state");
            if (!file.exists()) {
                file.mkdir();
            }
            this.stateStorageFactory = new StateStorageFactory(file);
        }
        return this.stateStorageFactory;
    }

    public StreamProcessorControl initStreamProcessor(String str, StreamProcessor streamProcessor) {
        return initStreamProcessor(str, 0, () -> {
            return streamProcessor;
        });
    }

    public StreamProcessorControl initStreamProcessor(String str, int i, Supplier<StreamProcessor> supplier) {
        StreamProcessorControlImpl streamProcessorControlImpl = new StreamProcessorControlImpl(getLogStream(str), supplier, i);
        this.closeables.manage(streamProcessorControlImpl);
        return streamProcessorControlImpl;
    }

    static {
        VALUE_TYPES.put(DeploymentRecord.class, ValueType.DEPLOYMENT);
        VALUE_TYPES.put(IncidentRecord.class, ValueType.INCIDENT);
        VALUE_TYPES.put(JobRecord.class, ValueType.JOB);
        VALUE_TYPES.put(WorkflowInstanceRecord.class, ValueType.WORKFLOW_INSTANCE);
        VALUE_TYPES.put(MessageRecord.class, ValueType.MESSAGE);
        VALUE_TYPES.put(MessageSubscriptionRecord.class, ValueType.MESSAGE_SUBSCRIPTION);
        VALUE_TYPES.put(WorkflowInstanceSubscriptionRecord.class, ValueType.WORKFLOW_INSTANCE_SUBSCRIPTION);
        VALUE_TYPES.put(ExporterRecord.class, ValueType.EXPORTER);
        VALUE_TYPES.put(RaftConfigurationEvent.class, ValueType.RAFT);
        VALUE_TYPES.put(JobBatchRecord.class, ValueType.JOB_BATCH);
        VALUE_TYPES.put(UnpackedObject.class, ValueType.NOOP);
    }
}
