/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.util;

import io.camunda.zeebe.db.ZeebeDbFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorContext;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.mutable.MutableProcessingState;
import io.camunda.zeebe.engine.util.RecordStream;
import io.camunda.zeebe.engine.util.RecordToWrite;
import io.camunda.zeebe.engine.util.TestStreams;
import io.camunda.zeebe.engine.util.client.CommandWriter;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.stream.api.state.KeyGenerator;
import io.camunda.zeebe.stream.impl.StreamProcessor;
import io.camunda.zeebe.stream.impl.StreamProcessorListener;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.Callable;

public class StreamProcessingComposite
implements CommandWriter {
    private static final String STREAM_NAME = "stream-";
    private final TestStreams streams;
    private final int partitionId;
    private final ZeebeDbFactory<?> zeebeDbFactory;
    private MutableProcessingState processingState;
    private final WriteActor writeActor = new WriteActor();

    public StreamProcessingComposite(TestStreams streams, int partitionId, ZeebeDbFactory<?> zeebeDbFactory, ActorScheduler actorScheduler) {
        this.streams = streams;
        this.partitionId = partitionId;
        this.zeebeDbFactory = zeebeDbFactory;
        actorScheduler.submitActor((Actor)this.writeActor).join();
    }

    public SynchronousLogStream getLogStream(int partitionId) {
        return this.streams.getLogStream(StreamProcessingComposite.getLogName(partitionId));
    }

    public LogStreamWriter newLogStreamWriter(int partitionId) {
        String logName = StreamProcessingComposite.getLogName(partitionId);
        return this.streams.newLogStreamWriter(logName);
    }

    public StreamProcessor startTypedStreamProcessor(StreamProcessorTestFactory factory, Optional<StreamProcessorListener> streamProcessorListenerOpt) {
        return this.startTypedStreamProcessor(processingContext -> this.createTypedRecordProcessors(factory, processingContext), streamProcessorListenerOpt);
    }

    private TypedRecordProcessors createTypedRecordProcessors(StreamProcessorTestFactory factory, TypedRecordProcessorContext typedRecordProcessorContext) {
        this.processingState = typedRecordProcessorContext.getProcessingState();
        return factory.build(TypedRecordProcessors.processors((KeyGenerator)this.processingState.getKeyGenerator(), (Writers)typedRecordProcessorContext.getWriters()), typedRecordProcessorContext);
    }

    public StreamProcessor startTypedStreamProcessor(TypedRecordProcessorFactory factory, Optional<StreamProcessorListener> streamProcessorListenerOpt) {
        return this.startTypedStreamProcessor(this.partitionId, factory, streamProcessorListenerOpt);
    }

    public StreamProcessor startTypedStreamProcessor(int partitionId, TypedRecordProcessorFactory factory, Optional<StreamProcessorListener> streamProcessorListenerOpt) {
        StreamProcessor result = this.streams.startStreamProcessor(StreamProcessingComposite.getLogName(partitionId), this.zeebeDbFactory, processingContext -> {
            this.processingState = processingContext.getProcessingState();
            return factory.createProcessors(processingContext);
        }, streamProcessorListenerOpt);
        return result;
    }

    public void pauseProcessing(int partitionId) {
        this.streams.pauseProcessing(StreamProcessingComposite.getLogName(partitionId));
    }

    public void resumeProcessing(int partitionId) {
        this.streams.resumeProcessing(StreamProcessingComposite.getLogName(partitionId));
    }

    public void snapshot(int partitionId) {
        this.streams.snapshot(StreamProcessingComposite.getLogName(partitionId));
    }

    public void closeStreamProcessor(int partitionId) {
        try {
            this.streams.closeProcessor(StreamProcessingComposite.getLogName(partitionId));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public StreamProcessor getStreamProcessor(int partitionId) {
        return this.streams.getStreamProcessor(StreamProcessingComposite.getLogName(partitionId));
    }

    public MutableProcessingState getProcessingState() {
        return this.processingState;
    }

    public RecordStream events() {
        return new RecordStream(this.streams.events(StreamProcessingComposite.getLogName(this.partitionId)));
    }

    public long writeBatch(RecordToWrite ... recordsToWrite) {
        LogStreamWriter writer = this.streams.newLogStreamWriter(StreamProcessingComposite.getLogName(this.partitionId));
        return (Long)this.writeActor.submit(() -> (Long)writer.tryWrite(Arrays.asList(recordsToWrite)).get()).join();
    }

    @Override
    public long writeCommand(Intent intent, UnifiedRecordValue value) {
        return this.writeCommand(intent, value, "<default>");
    }

    @Override
    public long writeCommand(Intent intent, UnifiedRecordValue value, String ... authorizedTenants) {
        TestStreams.FluentLogWriter writer = this.streams.newRecord(StreamProcessingComposite.getLogName(this.partitionId)).recordType(RecordType.COMMAND).intent(intent).authorizations(authorizedTenants).event(value);
        return (Long)this.writeActor.submit(writer::write).join();
    }

    @Override
    public long writeCommand(long key, Intent intent, UnifiedRecordValue value) {
        return this.writeCommand(key, intent, value, "<default>");
    }

    @Override
    public long writeCommand(long key, Intent intent, UnifiedRecordValue recordValue, String ... authorizedTenants) {
        TestStreams.FluentLogWriter writer = this.streams.newRecord(StreamProcessingComposite.getLogName(this.partitionId)).recordType(RecordType.COMMAND).key(key).intent(intent).authorizations(authorizedTenants).event(recordValue);
        return (Long)this.writeActor.submit(writer::write).join();
    }

    @Override
    public long writeCommand(int requestStreamId, long requestId, Intent intent, UnifiedRecordValue value) {
        TestStreams.FluentLogWriter writer = this.streams.newRecord(StreamProcessingComposite.getLogName(this.partitionId)).recordType(RecordType.COMMAND).requestId(requestId).requestStreamId(requestStreamId).intent(intent).authorizations("<default>").event(value);
        return (Long)this.writeActor.submit(writer::write).join();
    }

    @Override
    public long writeCommandOnPartition(int partition, Intent intent, UnifiedRecordValue value) {
        TestStreams.FluentLogWriter writer = this.streams.newRecord(StreamProcessingComposite.getLogName(partition)).recordType(RecordType.COMMAND).intent(intent).authorizations("<default>").event(value);
        return (Long)this.writeActor.submit(writer::write).join();
    }

    @Override
    public long writeCommandOnPartition(int partition, long key, Intent intent, UnifiedRecordValue value) {
        return this.writeCommandOnPartition(partition, key, intent, value, "<default>");
    }

    @Override
    public long writeCommandOnPartition(int partition, long key, Intent intent, UnifiedRecordValue value, String ... authorizedTenants) {
        TestStreams.FluentLogWriter writer = this.streams.newRecord(StreamProcessingComposite.getLogName(partition)).key(key).recordType(RecordType.COMMAND).intent(intent).authorizations(authorizedTenants).event(value);
        return (Long)this.writeActor.submit(writer::write).join();
    }

    public static String getLogName(int partitionId) {
        return STREAM_NAME + partitionId;
    }

    private static final class WriteActor
    extends Actor {
        private WriteActor() {
        }

        public ActorFuture<Long> submit(Callable<Long> write) {
            return this.actor.call(write);
        }
    }

    @FunctionalInterface
    public static interface StreamProcessorTestFactory {
        public TypedRecordProcessors build(TypedRecordProcessors var1, TypedRecordProcessorContext var2);
    }
}

