package io.zeebe.broker.logstreams.processor;

import io.zeebe.logstreams.processor.EventLifecycleContext;
import io.zeebe.logstreams.snapshot.BaseValueSnapshotSupport;
import io.zeebe.logstreams.snapshot.ComposedSnapshot;
import io.zeebe.logstreams.snapshot.ZbMapSnapshotSupport;
import io.zeebe.logstreams.spi.ComposableSnapshotSupport;
import io.zeebe.map.ZbMap;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.msgpack.value.BaseValue;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.intent.Intent;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;

/* loaded from: input_file:io/zeebe/broker/logstreams/processor/TypedEventStreamProcessorBuilder.class */
public class TypedEventStreamProcessorBuilder {
    protected final TypedStreamEnvironment environment;
    protected List<ComposableSnapshotSupport> stateResources = new ArrayList();
    protected RecordProcessorMap eventProcessors = new RecordProcessorMap();
    protected List<StreamProcessorLifecycleAware> lifecycleListeners = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/zeebe/broker/logstreams/processor/TypedEventStreamProcessorBuilder$ConsumerProcessor.class */
    public static class ConsumerProcessor<T extends UnpackedObject> implements TypedRecordProcessor<T> {
        private final Consumer<T> consumer;

        ConsumerProcessor(Consumer<T> consumer) {
            this.consumer = consumer;
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void processRecord(TypedRecord<T> typedRecord) {
            this.consumer.accept(typedRecord.getValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/zeebe/broker/logstreams/processor/TypedEventStreamProcessorBuilder$DelegatingEventProcessor.class */
    public static class DelegatingEventProcessor<T extends UnpackedObject> implements TypedRecordProcessor<T> {
        private Function<TypedRecord<T>, TypedRecordProcessor<T>> dispatcher;
        private TypedRecordProcessor<T> selectedProcessor;

        DelegatingEventProcessor(Function<TypedRecord<T>, TypedRecordProcessor<T>> function) {
            this.dispatcher = function;
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void processRecord(TypedRecord<T> typedRecord) {
            this.selectedProcessor = this.dispatcher.apply(typedRecord);
            if (this.selectedProcessor != null) {
                this.selectedProcessor.processRecord(typedRecord);
            }
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void processRecord(TypedRecord<T> typedRecord, EventLifecycleContext eventLifecycleContext) {
            this.selectedProcessor = this.dispatcher.apply(typedRecord);
            if (this.selectedProcessor != null) {
                this.selectedProcessor.processRecord(typedRecord, eventLifecycleContext);
            }
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public boolean executeSideEffects(TypedRecord<T> typedRecord, TypedResponseWriter typedResponseWriter) {
            if (this.selectedProcessor != null) {
                return this.selectedProcessor.executeSideEffects(typedRecord, typedResponseWriter);
            }
            return true;
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public long writeRecord(TypedRecord<T> typedRecord, TypedStreamWriter typedStreamWriter) {
            if (this.selectedProcessor != null) {
                return this.selectedProcessor.writeRecord(typedRecord, typedStreamWriter);
            }
            return 0L;
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void updateState(TypedRecord<T> typedRecord) {
            if (this.selectedProcessor != null) {
                this.selectedProcessor.updateState(typedRecord);
            }
        }
    }

    public TypedEventStreamProcessorBuilder(TypedStreamEnvironment typedStreamEnvironment) {
        this.environment = typedStreamEnvironment;
    }

    public TypedEventStreamProcessorBuilder onEvent(ValueType valueType, Intent intent, TypedRecordProcessor<?> typedRecordProcessor) {
        return onRecord(RecordType.EVENT, valueType, intent, typedRecordProcessor);
    }

    public <T extends UnpackedObject> TypedEventStreamProcessorBuilder onEvent(ValueType valueType, Intent intent, Predicate<T> predicate, TypedRecordProcessor<T> typedRecordProcessor) {
        return onEvent(valueType, intent, new DelegatingEventProcessor(typedRecord -> {
            if (predicate.test(typedRecord.getValue())) {
                return typedRecordProcessor;
            }
            return null;
        }));
    }

    public <T extends UnpackedObject> TypedEventStreamProcessorBuilder onEvent(ValueType valueType, Intent intent, Function<T, TypedRecordProcessor<T>> function) {
        return onEvent(valueType, intent, new DelegatingEventProcessor(typedRecord -> {
            return (TypedRecordProcessor) function.apply(typedRecord.getValue());
        }));
    }

    public TypedEventStreamProcessorBuilder onEvent(ValueType valueType, Intent intent, Consumer<? extends UnpackedObject> consumer) {
        return onEvent(valueType, intent, new ConsumerProcessor(consumer));
    }

    private TypedEventStreamProcessorBuilder onRecord(RecordType recordType, ValueType valueType, Intent intent, TypedRecordProcessor<?> typedRecordProcessor) {
        this.eventProcessors.put(recordType, valueType, intent.value(), typedRecordProcessor);
        return this;
    }

    public TypedEventStreamProcessorBuilder onCommand(ValueType valueType, Intent intent, TypedRecordProcessor<?> typedRecordProcessor) {
        return onRecord(RecordType.COMMAND, valueType, intent, typedRecordProcessor);
    }

    public <T extends UnpackedObject> TypedEventStreamProcessorBuilder onCommand(ValueType valueType, Intent intent, Predicate<T> predicate, TypedRecordProcessor<T> typedRecordProcessor) {
        return onCommand(valueType, intent, new DelegatingEventProcessor(typedRecord -> {
            if (predicate.test(typedRecord.getValue())) {
                return typedRecordProcessor;
            }
            return null;
        }));
    }

    public TypedEventStreamProcessorBuilder onRejection(ValueType valueType, Intent intent, TypedRecordProcessor<?> typedRecordProcessor) {
        return onRecord(RecordType.COMMAND_REJECTION, valueType, intent, typedRecordProcessor);
    }

    public TypedEventStreamProcessorBuilder withListener(StreamProcessorLifecycleAware streamProcessorLifecycleAware) {
        this.lifecycleListeners.add(streamProcessorLifecycleAware);
        return this;
    }

    public TypedEventStreamProcessorBuilder withStateResource(final ZbMap<?, ?> zbMap) {
        this.stateResources.add(new ZbMapSnapshotSupport(zbMap));
        withListener(new StreamProcessorLifecycleAware() { // from class: io.zeebe.broker.logstreams.processor.TypedEventStreamProcessorBuilder.1
            @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
            public void onClose() {
                zbMap.close();
            }
        });
        return this;
    }

    public TypedEventStreamProcessorBuilder withStateResource(BaseValue baseValue) {
        this.stateResources.add(new BaseValueSnapshotSupport(baseValue));
        return this;
    }

    public TypedEventStreamProcessorBuilder withStateResource(ComposableSnapshotSupport composableSnapshotSupport) {
        this.stateResources.add(composableSnapshotSupport);
        return this;
    }

    public TypedStreamProcessor build() {
        return new TypedStreamProcessor(!this.stateResources.isEmpty() ? new ComposedSnapshot((ComposableSnapshotSupport[]) this.stateResources.toArray(new ComposableSnapshotSupport[this.stateResources.size()])) : new NoopSnapshotSupport(), this.environment.getOutput(), this.eventProcessors, this.lifecycleListeners, this.environment.getEventRegistry(), this.environment);
    }
}
