/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.util;

import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.db.ZeebeDbFactory;
import io.camunda.zeebe.engine.Engine;
import io.camunda.zeebe.engine.EngineConfiguration;
import io.camunda.zeebe.engine.Loggers;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorContext;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.util.AuthorizationUtil;
import io.camunda.zeebe.engine.util.StreamProcessorListenerRelay;
import io.camunda.zeebe.logstreams.log.LogAppendEntry;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.logstreams.util.ListLogStorage;
import io.camunda.zeebe.logstreams.util.SyncLogStream;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.protocol.impl.record.CopiedRecord;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.stream.api.CommandResponseWriter;
import io.camunda.zeebe.stream.api.InterPartitionCommandSender;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.stream.impl.StreamProcessor;
import io.camunda.zeebe.stream.impl.StreamProcessorBuilder;
import io.camunda.zeebe.stream.impl.StreamProcessorListener;
import io.camunda.zeebe.stream.impl.StreamProcessorMode;
import io.camunda.zeebe.stream.impl.TypedEventRegistry;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.agrona.DirectBuffer;
import org.awaitility.Awaitility;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;

public final class TestStreams {
    private static final String SNAPSHOT_FOLDER = "snapshot";
    private static final Map<Class<?>, ValueType> VALUE_TYPES = new HashMap();
    private static final Logger LOG = Loggers.STREAM_PROCESSING;
    private final TemporaryFolder dataDirectory;
    private final AutoCloseableRule closeables;
    private final ActorScheduler actorScheduler;
    private final CommandResponseWriter mockCommandResponseWriter;
    private final Map<String, LogContext> logContextMap = new HashMap<String, LogContext>();
    private final Map<String, ProcessorContext> streamContextMap = new HashMap<String, ProcessorContext>();
    private boolean snapshotWasTaken = false;
    private StreamProcessorMode streamProcessorMode = StreamProcessorMode.PROCESSING;
    private int maxCommandsInBatch = 100;
    private ListLogStorage listLogStorage;

    public TestStreams(TemporaryFolder dataDirectory, AutoCloseableRule closeables, ActorScheduler actorScheduler) {
        this.dataDirectory = dataDirectory;
        this.closeables = closeables;
        this.actorScheduler = actorScheduler;
        this.mockCommandResponseWriter = (CommandResponseWriter)Mockito.mock(CommandResponseWriter.class);
        Mockito.when((Object)this.mockCommandResponseWriter.intent((Intent)ArgumentMatchers.any())).thenReturn((Object)this.mockCommandResponseWriter);
        Mockito.when((Object)this.mockCommandResponseWriter.key(ArgumentMatchers.anyLong())).thenReturn((Object)this.mockCommandResponseWriter);
        Mockito.when((Object)this.mockCommandResponseWriter.partitionId(ArgumentMatchers.anyInt())).thenReturn((Object)this.mockCommandResponseWriter);
        Mockito.when((Object)this.mockCommandResponseWriter.recordType((RecordType)ArgumentMatchers.any())).thenReturn((Object)this.mockCommandResponseWriter);
        Mockito.when((Object)this.mockCommandResponseWriter.rejectionType((RejectionType)ArgumentMatchers.any())).thenReturn((Object)this.mockCommandResponseWriter);
        Mockito.when((Object)this.mockCommandResponseWriter.rejectionReason((DirectBuffer)ArgumentMatchers.any())).thenReturn((Object)this.mockCommandResponseWriter);
        Mockito.when((Object)this.mockCommandResponseWriter.valueType((ValueType)ArgumentMatchers.any())).thenReturn((Object)this.mockCommandResponseWriter);
        Mockito.when((Object)this.mockCommandResponseWriter.valueWriter((BufferWriter)ArgumentMatchers.any())).thenReturn((Object)this.mockCommandResponseWriter);
    }

    public void withStreamProcessorMode(StreamProcessorMode streamProcessorMode) {
        this.streamProcessorMode = streamProcessorMode;
    }

    public CommandResponseWriter getMockedResponseWriter() {
        return this.mockCommandResponseWriter;
    }

    public SynchronousLogStream createLogStream(String name) {
        return this.createLogStream(name, 0);
    }

    public SynchronousLogStream createLogStream(String name, int partitionId) {
        this.listLogStorage = new ListLogStorage();
        return this.createLogStream(name, partitionId, (LogStorage)this.listLogStorage, logStream -> this.listLogStorage.setPositionListener(arg_0 -> ((SyncLogStream)logStream).setLastWrittenPosition(arg_0)));
    }

    public SynchronousLogStream createLogStream(String name, int partitionId, ListLogStorage sharedStorage) {
        return this.createLogStream(name, partitionId, (LogStorage)sharedStorage, logStream -> sharedStorage.setPositionListener(arg_0 -> ((SyncLogStream)logStream).setLastWrittenPosition(arg_0)));
    }

    private SynchronousLogStream createLogStream(String name, int partitionId, LogStorage logStorage, Consumer<SyncLogStream> logStreamConsumer) {
        SyncLogStream logStream = SyncLogStream.builder().withLogName(name).withLogStorage(logStorage).withPartitionId(partitionId).withActorSchedulingService((ActorSchedulingService)this.actorScheduler).build();
        logStreamConsumer.accept(logStream);
        LogContext logContext = LogContext.createLogContext(logStream);
        this.logContextMap.put(name, logContext);
        this.closeables.manage((AutoCloseable)logContext);
        this.closeables.manage(() -> this.logContextMap.remove(name));
        return logStream;
    }

    public SynchronousLogStream getLogStream(String name) {
        return this.logContextMap.get(name).getLogStream();
    }

    public LogStreamWriter newLogStreamWriter(String name) {
        return this.logContextMap.get(name).newLogStreamWriter();
    }

    public Stream<LoggedEvent> events(String logName) {
        SynchronousLogStream logStream = this.getLogStream(logName);
        LogStreamReader reader = logStream.newLogStreamReader();
        this.closeables.manage((AutoCloseable)reader);
        reader.seekToFirstEvent();
        Iterable iterable = () -> reader;
        return StreamSupport.stream(iterable.spliterator(), false);
    }

    public FluentLogWriter newRecord(String logName) {
        return new FluentLogWriter(this.newLogStreamWriter(logName));
    }

    public Path createRuntimeFolder(SynchronousLogStream stream) {
        Path rootDirectory = this.dataDirectory.getRoot().toPath().resolve(stream.getLogName()).resolve("state");
        try {
            Files.createDirectories(rootDirectory, new FileAttribute[0]);
        }
        catch (FileAlreadyExistsException fileAlreadyExistsException) {
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        return rootDirectory.resolve("runtime");
    }

    public StreamProcessor startStreamProcessor(String log, ZeebeDbFactory zeebeDbFactory, TypedRecordProcessorFactory typedRecordProcessorFactory) {
        return this.startStreamProcessor(log, zeebeDbFactory, typedRecordProcessorFactory, Optional.empty());
    }

    public StreamProcessor startStreamProcessor(String log, ZeebeDbFactory zeebeDbFactory, TypedRecordProcessorFactory typedRecordProcessorFactory, Optional<StreamProcessorListener> streamProcessorListenerOpt) {
        SynchronousLogStream stream = this.getLogStream(log);
        return this.buildStreamProcessor(stream, zeebeDbFactory, typedRecordProcessorFactory, true, streamProcessorListenerOpt);
    }

    public StreamProcessor buildStreamProcessor(SynchronousLogStream stream, ZeebeDbFactory zeebeDbFactory, TypedRecordProcessorFactory factory, boolean awaitOpening, Optional<StreamProcessorListener> streamProcessorListenerOpt) {
        Path storage = this.createRuntimeFolder(stream);
        Path snapshot = storage.getParent().resolve(SNAPSHOT_FOLDER);
        final CountDownLatch recoveredLatch = new CountDownLatch(1);
        StreamProcessorLifecycleAware recoveredAwaiter = new StreamProcessorLifecycleAware(){

            public void onRecovered(ReadonlyStreamProcessorContext context) {
                recoveredLatch.countDown();
            }
        };
        TypedRecordProcessorFactory wrappedFactory = arg_0 -> TestStreams.lambda$buildStreamProcessor$5(factory, recoveredAwaiter, arg_0);
        ZeebeDb zeebeDb = this.snapshotWasTaken ? zeebeDbFactory.createDb(snapshot.toFile()) : zeebeDbFactory.createDb(storage.toFile());
        String logName = stream.getLogName();
        ArrayList<StreamProcessorListener> streamProcessorListeners = new ArrayList<StreamProcessorListener>();
        streamProcessorListenerOpt.ifPresent(streamProcessorListeners::add);
        StreamProcessorBuilder builder = StreamProcessor.builder().logStream(stream.getAsyncLogStream()).zeebeDb(zeebeDb).actorSchedulingService((ActorSchedulingService)this.actorScheduler).commandResponseWriter(this.mockCommandResponseWriter).listener((StreamProcessorListener)new StreamProcessorListenerRelay(streamProcessorListeners)).recordProcessors(List.of(new Engine(wrappedFactory, new EngineConfiguration()))).streamProcessorMode(this.streamProcessorMode).maxCommandsInBatch(this.maxCommandsInBatch).partitionCommandSender((InterPartitionCommandSender)Mockito.mock(InterPartitionCommandSender.class));
        StreamProcessor streamProcessor = builder.build();
        ActorFuture openFuture = streamProcessor.openAsync(false);
        if (awaitOpening) {
            try {
                recoveredLatch.await(15L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.interrupted();
            }
        }
        openFuture.join(15L, TimeUnit.SECONDS);
        ProcessorContext processorContext = ProcessorContext.createStreamContext(streamProcessor, zeebeDb, storage, snapshot);
        this.streamContextMap.put(logName, processorContext);
        this.closeables.manage((AutoCloseable)processorContext);
        return streamProcessor;
    }

    public void pauseProcessing(String streamName) {
        this.streamContextMap.get((Object)streamName).streamProcessor.pauseProcessing().join();
        LOG.info("Paused processing for stream {}", (Object)streamName);
    }

    public void resumeProcessing(String streamName) {
        this.streamContextMap.get((Object)streamName).streamProcessor.resumeProcessing();
        LOG.info("Resume processing for stream {}", (Object)streamName);
    }

    public void resetLog() {
        this.listLogStorage.reset();
    }

    public void snapshot(String streamName) {
        this.streamContextMap.get(streamName).snapshot();
        this.snapshotWasTaken = true;
        LOG.info("Snapshot database for stream {}", (Object)streamName);
    }

    public void closeProcessor(String streamName) throws Exception {
        this.streamContextMap.remove(streamName).close();
        LOG.info("Closed stream {}", (Object)streamName);
    }

    public StreamProcessor getStreamProcessor(String streamName) {
        return Optional.ofNullable(this.streamContextMap.get(streamName)).map(c -> c.streamProcessor).orElseThrow(() -> new NoSuchElementException("No stream processor found with name: " + streamName));
    }

    public void maxCommandsInBatch(int maxCommandsInBatch) {
        this.maxCommandsInBatch = maxCommandsInBatch;
    }

    private static /* synthetic */ TypedRecordProcessors lambda$buildStreamProcessor$5(TypedRecordProcessorFactory factory, 1 recoveredAwaiter, TypedRecordProcessorContext ctx) {
        return factory.createProcessors(ctx).withListener((StreamProcessorLifecycleAware)recoveredAwaiter);
    }

    static {
        TypedEventRegistry.EVENT_REGISTRY.forEach((v, c) -> VALUE_TYPES.put((Class<?>)c, (ValueType)v));
    }

    private static final class LogContext
    implements AutoCloseable {
        private final SynchronousLogStream logStream;

        private LogContext(SynchronousLogStream logStream) {
            this.logStream = logStream;
        }

        public static LogContext createLogContext(SyncLogStream logStream) {
            return new LogContext((SynchronousLogStream)logStream);
        }

        @Override
        public void close() {
            this.logStream.close();
        }

        public SynchronousLogStream getLogStream() {
            return this.logStream;
        }

        public LogStreamWriter newLogStreamWriter() {
            return this.logStream.newLogStreamWriter();
        }
    }

    public static class FluentLogWriter {
        protected final RecordMetadata metadata = new RecordMetadata();
        protected final LogStreamWriter writer;
        protected UnifiedRecordValue value;
        protected long key = -1L;
        private long sourceRecordPosition = -1L;

        public FluentLogWriter(LogStreamWriter logStreamWriter) {
            this.writer = logStreamWriter;
            this.metadata.protocolVersion(4);
        }

        public FluentLogWriter record(CopiedRecord record) {
            this.intent(record.getIntent());
            this.key(record.getKey());
            this.sourceRecordPosition(record.getSourceRecordPosition());
            this.recordType(record.getRecordType());
            this.event(record.getValue());
            return this;
        }

        public FluentLogWriter intent(Intent intent) {
            this.metadata.intent(intent);
            return this;
        }

        public FluentLogWriter requestId(long requestId) {
            this.metadata.requestId(requestId);
            return this;
        }

        public FluentLogWriter authorizations(String ... tenantIds) {
            this.metadata.authorization(AuthorizationUtil.getAuthInfo(tenantIds));
            return this;
        }

        public FluentLogWriter sourceRecordPosition(long sourceRecordPosition) {
            this.sourceRecordPosition = sourceRecordPosition;
            return this;
        }

        public FluentLogWriter requestStreamId(int requestStreamId) {
            this.metadata.requestStreamId(requestStreamId);
            return this;
        }

        public FluentLogWriter recordType(RecordType recordType) {
            this.metadata.recordType(recordType);
            return this;
        }

        public FluentLogWriter key(long key) {
            this.key = key;
            return this;
        }

        public FluentLogWriter event(UnifiedRecordValue event) {
            ValueType eventType = VALUE_TYPES.get(event.getClass());
            if (eventType == null) {
                throw new RuntimeException("No event type registered for getValue " + String.valueOf(event.getClass()));
            }
            this.metadata.valueType(eventType);
            this.value = event;
            return this;
        }

        public long write() {
            LogAppendEntry entry = this.key >= 0L ? LogAppendEntry.of((long)this.key, (RecordMetadata)this.metadata, (UnifiedRecordValue)this.value) : LogAppendEntry.of((RecordMetadata)this.metadata, (UnifiedRecordValue)this.value);
            return (Long)((Either)Awaitility.await((String)"until entry is written").pollInSameThread().pollDelay(Duration.ZERO).pollInterval(Duration.ofMillis(50L)).until(() -> this.writer.tryWrite(entry, this.sourceRecordPosition), Either::isRight)).get();
        }
    }

    private static final class ProcessorContext
    implements AutoCloseable {
        private final ZeebeDb zeebeDb;
        private final StreamProcessor streamProcessor;
        private final Path runtimePath;
        private final Path snapshotPath;
        private boolean closed = false;

        private ProcessorContext(StreamProcessor streamProcessor, ZeebeDb zeebeDb, Path runtimePath, Path snapshotPath) {
            this.streamProcessor = streamProcessor;
            this.zeebeDb = zeebeDb;
            this.runtimePath = runtimePath;
            this.snapshotPath = snapshotPath;
        }

        public static ProcessorContext createStreamContext(StreamProcessor streamProcessor, ZeebeDb zeebeDb, Path runtimePath, Path snapshotPath) {
            return new ProcessorContext(streamProcessor, zeebeDb, runtimePath, snapshotPath);
        }

        public void snapshot() {
            this.zeebeDb.createSnapshot(this.snapshotPath.toFile());
        }

        @Override
        public void close() throws Exception {
            if (this.closed) {
                return;
            }
            LOG.debug("Close stream processor");
            this.streamProcessor.closeAsync().join();
            this.zeebeDb.close();
            if (this.runtimePath.toFile().exists()) {
                FileUtil.deleteFolder((Path)this.runtimePath);
            }
            this.closed = true;
        }
    }
}

