/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.util;

import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.db.ZeebeDbFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.ProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedEventRegistry;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.engine.util.ListLogStorage;
import io.camunda.zeebe.engine.util.RecordToWrite;
import io.camunda.zeebe.logstreams.impl.log.LogStorageAppender;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.logstreams.util.SyncLogStream;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.record.CopiedRecord;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.Loggers;
import io.camunda.zeebe.util.buffer.BufferWriter;
import io.camunda.zeebe.util.sched.ActorScheduler;
import io.camunda.zeebe.util.sched.ActorSchedulingService;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.agrona.DirectBuffer;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public final class TestStreams {
    private static final String SNAPSHOT_FOLDER = "snapshot";
    private static final Map<Class<?>, ValueType> VALUE_TYPES = new HashMap();
    private final TemporaryFolder dataDirectory;
    private final AutoCloseableRule closeables;
    private final ActorScheduler actorScheduler;
    private final CommandResponseWriter mockCommandResponseWriter;
    private final StreamProcessorListener mockStreamProcessorListener;
    private final Map<String, LogContext> logContextMap = new HashMap<String, LogContext>();
    private final Map<String, ProcessorContext> streamContextMap = new HashMap<String, ProcessorContext>();
    private boolean snapshotWasTaken = false;
    private Function<MutableZeebeState, EventApplier> eventApplierFactory = EventAppliers::new;
    private StreamProcessorMode streamProcessorMode = StreamProcessorMode.PROCESSING;

    public TestStreams(TemporaryFolder dataDirectory, AutoCloseableRule closeables, ActorScheduler actorScheduler) {
        this.dataDirectory = dataDirectory;
        this.closeables = closeables;
        this.actorScheduler = actorScheduler;
        this.mockCommandResponseWriter = (CommandResponseWriter)Mockito.mock(CommandResponseWriter.class);
        Mockito.when((Object)this.mockCommandResponseWriter.intent((Intent)ArgumentMatchers.any())).thenReturn((Object)this.mockCommandResponseWriter);
        Mockito.when((Object)this.mockCommandResponseWriter.key(ArgumentMatchers.anyLong())).thenReturn((Object)this.mockCommandResponseWriter);
        Mockito.when((Object)this.mockCommandResponseWriter.partitionId(ArgumentMatchers.anyInt())).thenReturn((Object)this.mockCommandResponseWriter);
        Mockito.when((Object)this.mockCommandResponseWriter.recordType((RecordType)ArgumentMatchers.any())).thenReturn((Object)this.mockCommandResponseWriter);
        Mockito.when((Object)this.mockCommandResponseWriter.rejectionType((RejectionType)ArgumentMatchers.any())).thenReturn((Object)this.mockCommandResponseWriter);
        Mockito.when((Object)this.mockCommandResponseWriter.rejectionReason((DirectBuffer)ArgumentMatchers.any())).thenReturn((Object)this.mockCommandResponseWriter);
        Mockito.when((Object)this.mockCommandResponseWriter.valueType((ValueType)ArgumentMatchers.any())).thenReturn((Object)this.mockCommandResponseWriter);
        Mockito.when((Object)this.mockCommandResponseWriter.valueWriter((BufferWriter)ArgumentMatchers.any())).thenReturn((Object)this.mockCommandResponseWriter);
        Mockito.when((Object)this.mockCommandResponseWriter.tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong())).thenReturn((Object)true);
        this.mockStreamProcessorListener = (StreamProcessorListener)Mockito.mock(StreamProcessorListener.class);
    }

    public void withEventApplierFactory(Function<MutableZeebeState, EventApplier> eventApplierFactory) {
        this.eventApplierFactory = eventApplierFactory;
    }

    public void withStreamProcessorMode(StreamProcessorMode streamProcessorMode) {
        this.streamProcessorMode = streamProcessorMode;
    }

    public CommandResponseWriter getMockedResponseWriter() {
        return this.mockCommandResponseWriter;
    }

    public StreamProcessorListener getMockStreamProcessorListener() {
        return this.mockStreamProcessorListener;
    }

    public SynchronousLogStream createLogStream(String name) {
        return this.createLogStream(name, 0);
    }

    public SynchronousLogStream createLogStream(String name, int partitionId) {
        ListLogStorage listLogStorage = new ListLogStorage();
        return this.createLogStream(name, partitionId, listLogStorage, logStream -> listLogStorage.setPositionListener(arg_0 -> ((SyncLogStream)logStream).setLastWrittenPosition(arg_0)));
    }

    public SynchronousLogStream createLogStream(String name, int partitionId, ListLogStorage sharedStorage) {
        return this.createLogStream(name, partitionId, sharedStorage, logStream -> sharedStorage.setPositionListener(arg_0 -> ((SyncLogStream)logStream).setLastWrittenPosition(arg_0)));
    }

    private SynchronousLogStream createLogStream(String name, int partitionId, LogStorage logStorage, Consumer<SyncLogStream> logStreamConsumer) {
        SyncLogStream logStream = SyncLogStream.builder().withLogName(name).withLogStorage(logStorage).withPartitionId(partitionId).withActorSchedulingService((ActorSchedulingService)this.actorScheduler).build();
        logStreamConsumer.accept(logStream);
        LogContext logContext = LogContext.createLogContext(logStream, logStorage);
        this.logContextMap.put(name, logContext);
        this.closeables.manage((AutoCloseable)logContext);
        this.closeables.manage(() -> this.logContextMap.remove(name));
        return logStream;
    }

    public long getLastWrittenPosition(String name) {
        return this.getLogStream(name).getLastWrittenPosition();
    }

    public SynchronousLogStream getLogStream(String name) {
        return this.logContextMap.get(name).getLogStream();
    }

    public LogStreamRecordWriter getLogStreamRecordWriter(String name) {
        return this.logContextMap.get(name).getLogStreamWriter();
    }

    public LogStreamRecordWriter newLogStreamRecordWriter(String name) {
        return this.logContextMap.get(name).newLogStreamRecordWriter();
    }

    public Stream<LoggedEvent> events(String logName) {
        SynchronousLogStream logStream = this.getLogStream(logName);
        LogStreamReader reader = logStream.newLogStreamReader();
        this.closeables.manage((AutoCloseable)reader);
        reader.seekToFirstEvent();
        Iterable iterable = () -> reader;
        return StreamSupport.stream(iterable.spliterator(), false);
    }

    public FluentLogWriter newRecord(LogStreamRecordWriter logStreamRecordWriter) {
        return new FluentLogWriter(logStreamRecordWriter);
    }

    public FluentLogWriter newRecord(String logName) {
        return new FluentLogWriter(this.newLogStreamRecordWriter(logName));
    }

    public Path createRuntimeFolder(SynchronousLogStream stream) {
        Path rootDirectory = this.dataDirectory.getRoot().toPath().resolve(stream.getLogName()).resolve("state");
        try {
            Files.createDirectories(rootDirectory, new FileAttribute[0]);
        }
        catch (FileAlreadyExistsException fileAlreadyExistsException) {
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        return rootDirectory.resolve("runtime");
    }

    public StreamProcessor startStreamProcessor(String log, ZeebeDbFactory zeebeDbFactory, TypedRecordProcessorFactory typedRecordProcessorFactory) {
        SynchronousLogStream stream = this.getLogStream(log);
        return this.buildStreamProcessor(stream, zeebeDbFactory, typedRecordProcessorFactory, true);
    }

    public StreamProcessor startStreamProcessorNotAwaitOpening(String log, ZeebeDbFactory zeebeDbFactory, TypedRecordProcessorFactory typedRecordProcessorFactory) {
        SynchronousLogStream stream = this.getLogStream(log);
        return this.buildStreamProcessor(stream, zeebeDbFactory, typedRecordProcessorFactory, false);
    }

    private StreamProcessor buildStreamProcessor(SynchronousLogStream stream, ZeebeDbFactory zeebeDbFactory, TypedRecordProcessorFactory factory, boolean awaitOpening) {
        Path storage = this.createRuntimeFolder(stream);
        Path snapshot = storage.getParent().resolve(SNAPSHOT_FOLDER);
        final CountDownLatch recoveredLatch = new CountDownLatch(1);
        StreamProcessorLifecycleAware recoveredAwaiter = new StreamProcessorLifecycleAware(){

            public void onRecovered(ReadonlyProcessingContext context) {
                recoveredLatch.countDown();
            }
        };
        TypedRecordProcessorFactory wrappedFactory = arg_0 -> TestStreams.lambda$buildStreamProcessor$5(factory, recoveredAwaiter, arg_0);
        ZeebeDb zeebeDb = this.snapshotWasTaken ? zeebeDbFactory.createDb(snapshot.toFile()) : zeebeDbFactory.createDb(storage.toFile());
        String logName = stream.getLogName();
        StreamProcessor streamProcessor = StreamProcessor.builder().logStream(stream.getAsyncLogStream()).zeebeDb(zeebeDb).actorSchedulingService((ActorSchedulingService)this.actorScheduler).commandResponseWriter(this.mockCommandResponseWriter).listener(this.mockStreamProcessorListener).streamProcessorFactory(wrappedFactory).eventApplierFactory(this.eventApplierFactory).streamProcessorMode(this.streamProcessorMode).build();
        ActorFuture openFuture = streamProcessor.openAsync(false);
        if (awaitOpening) {
            try {
                recoveredLatch.await(15L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.interrupted();
            }
        }
        openFuture.join(15L, TimeUnit.SECONDS);
        LogContext context = this.logContextMap.get(logName);
        ProcessorContext processorContext = ProcessorContext.createStreamContext(context, streamProcessor, zeebeDb, storage, snapshot);
        this.streamContextMap.put(logName, processorContext);
        this.closeables.manage((AutoCloseable)processorContext);
        return streamProcessor;
    }

    public void pauseProcessing(String streamName) {
        this.streamContextMap.get((Object)streamName).streamProcessor.pauseProcessing();
        LogStorageAppender.LOG.info("Paused processing for stream {}", (Object)streamName);
    }

    public void resumeProcessing(String streamName) {
        this.streamContextMap.get((Object)streamName).streamProcessor.resumeProcessing();
        LogStorageAppender.LOG.info("Resume processing for stream {}", (Object)streamName);
    }

    public void snapshot(String streamName) {
        this.streamContextMap.get(streamName).snapshot();
        this.snapshotWasTaken = true;
        LogStorageAppender.LOG.info("Snapshot database for stream {}", (Object)streamName);
    }

    public void closeProcessor(String streamName) throws Exception {
        this.streamContextMap.remove(streamName).close();
        LogStorageAppender.LOG.info("Closed stream {}", (Object)streamName);
    }

    public StreamProcessor getStreamProcessor(String streamName) {
        return Optional.ofNullable(this.streamContextMap.get(streamName)).map(c -> c.streamProcessor).orElseThrow(() -> new NoSuchElementException("No stream processor found with name: " + streamName));
    }

    public long writeBatch(String logName, RecordToWrite[] recordToWrites) {
        SynchronousLogStream logStream = this.getLogStream(logName);
        LogStreamBatchWriter logStreamBatchWriter = logStream.newLogStreamBatchWriter();
        for (RecordToWrite recordToWrite : recordToWrites) {
            logStreamBatchWriter.event().key(recordToWrite.getKey()).sourceIndex(recordToWrite.getSourceIndex()).metadataWriter((BufferWriter)recordToWrite.getRecordMetadata()).valueWriter((BufferWriter)recordToWrite.getUnifiedRecordValue()).done();
        }
        return logStreamBatchWriter.tryWrite();
    }

    private static /* synthetic */ TypedRecordProcessors lambda$buildStreamProcessor$5(TypedRecordProcessorFactory factory, 1 recoveredAwaiter, ProcessingContext ctx) {
        return factory.createProcessors(ctx).withListener((StreamProcessorLifecycleAware)recoveredAwaiter);
    }

    static {
        TypedEventRegistry.EVENT_REGISTRY.forEach((v, c) -> VALUE_TYPES.put((Class<?>)c, (ValueType)v));
    }

    private static final class ProcessorContext
    implements AutoCloseable {
        private final LogContext logContext;
        private final ZeebeDb zeebeDb;
        private final StreamProcessor streamProcessor;
        private final Path runtimePath;
        private final Path snapshotPath;
        private boolean closed = false;

        private ProcessorContext(LogContext logContext, StreamProcessor streamProcessor, ZeebeDb zeebeDb, Path runtimePath, Path snapshotPath) {
            this.logContext = logContext;
            this.streamProcessor = streamProcessor;
            this.zeebeDb = zeebeDb;
            this.runtimePath = runtimePath;
            this.snapshotPath = snapshotPath;
        }

        public static ProcessorContext createStreamContext(LogContext logContext, StreamProcessor streamProcessor, ZeebeDb zeebeDb, Path runtimePath, Path snapshotPath) {
            return new ProcessorContext(logContext, streamProcessor, zeebeDb, runtimePath, snapshotPath);
        }

        public SynchronousLogStream getLogStream() {
            return this.logContext.getLogStream();
        }

        public void snapshot() {
            this.zeebeDb.createSnapshot(this.snapshotPath.toFile());
        }

        @Override
        public void close() throws Exception {
            if (this.closed) {
                return;
            }
            Loggers.IO_LOGGER.debug("Close stream processor");
            this.streamProcessor.closeAsync().join();
            this.zeebeDb.close();
            if (this.runtimePath.toFile().exists()) {
                FileUtil.deleteFolder((Path)this.runtimePath);
            }
            this.closed = true;
        }
    }

    private static final class LogContext
    implements AutoCloseable {
        private final SynchronousLogStream logStream;
        private final LogStreamRecordWriter logStreamWriter;

        private LogContext(SynchronousLogStream logStream, LogStorage logStorage) {
            this.logStream = logStream;
            this.logStreamWriter = logStream.newLogStreamRecordWriter();
        }

        public static LogContext createLogContext(SyncLogStream logStream, LogStorage logStorage) {
            return new LogContext((SynchronousLogStream)logStream, logStorage);
        }

        @Override
        public void close() {
            this.logStream.close();
        }

        public LogStreamRecordWriter getLogStreamWriter() {
            return this.logStreamWriter;
        }

        public SynchronousLogStream getLogStream() {
            return this.logStream;
        }

        public LogStreamRecordWriter newLogStreamRecordWriter() {
            return this.logStream.newLogStreamRecordWriter();
        }
    }

    public static class FluentLogWriter {
        protected final RecordMetadata metadata = new RecordMetadata();
        protected final LogStreamRecordWriter writer;
        protected UnpackedObject value;
        protected long key = -1L;
        private long sourceRecordPosition = -1L;

        public FluentLogWriter(LogStreamRecordWriter logStreamRecordWriter) {
            this.writer = logStreamRecordWriter;
            this.metadata.protocolVersion(3);
        }

        public FluentLogWriter record(CopiedRecord record) {
            this.intent(record.getIntent());
            this.key(record.getKey());
            this.sourceRecordPosition(record.getSourceRecordPosition());
            this.recordType(record.getRecordType());
            this.event((UnpackedObject)record.getValue());
            return this;
        }

        public FluentLogWriter intent(Intent intent) {
            this.metadata.intent(intent);
            return this;
        }

        public FluentLogWriter requestId(long requestId) {
            this.metadata.requestId(requestId);
            return this;
        }

        public FluentLogWriter sourceRecordPosition(long sourceRecordPosition) {
            this.sourceRecordPosition = sourceRecordPosition;
            return this;
        }

        public FluentLogWriter requestStreamId(int requestStreamId) {
            this.metadata.requestStreamId(requestStreamId);
            return this;
        }

        public FluentLogWriter recordType(RecordType recordType) {
            this.metadata.recordType(recordType);
            return this;
        }

        public FluentLogWriter key(long key) {
            this.key = key;
            return this;
        }

        public FluentLogWriter event(UnpackedObject event) {
            ValueType eventType = VALUE_TYPES.get(event.getClass());
            if (eventType == null) {
                throw new RuntimeException("No event type registered for getValue " + event.getClass());
            }
            this.metadata.valueType(eventType);
            this.value = event;
            return this;
        }

        public long write() {
            this.writer.sourceRecordPosition(this.sourceRecordPosition);
            if (this.key >= 0L) {
                this.writer.key(this.key);
            } else {
                this.writer.keyNull();
            }
            this.writer.metadataWriter((BufferWriter)this.metadata);
            this.writer.valueWriter((BufferWriter)this.value);
            return (Long)TestUtil.doRepeatedly(() -> ((LogStreamRecordWriter)this.writer).tryWrite()).until(p -> p >= 0L);
        }
    }
}

