package io.zeebe.broker.logstreams.processor;

import io.zeebe.broker.clustering.base.partitions.Partition;
import io.zeebe.logstreams.LogStreams;
import io.zeebe.logstreams.impl.service.StreamProcessorService;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.processor.EventFilter;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.spi.SnapshotController;
import io.zeebe.logstreams.spi.SnapshotStorage;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceContainer;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.util.EnsureUtil;
import io.zeebe.util.sched.ActorScheduler;
import io.zeebe.util.sched.future.ActorFuture;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

/* loaded from: input_file:io/zeebe/broker/logstreams/processor/StreamProcessorServiceFactory.class */
public class StreamProcessorServiceFactory implements Service<StreamProcessorServiceFactory> {
    private final ServiceContainer serviceContainer;
    private final Duration snapshotPeriod;
    private ActorScheduler actorScheduler;

    /* loaded from: input_file:io/zeebe/broker/logstreams/processor/StreamProcessorServiceFactory$Builder.class */
    public class Builder {
        private final LogStream logStream;
        private final SnapshotStorage snapshotStorage;
        private SnapshotController snapshotController;
        private String processorName;
        private StreamProcessor streamProcessor;
        protected MetadataFilter customEventFilter;
        private int processorId = -1;
        private final List<ServiceName<?>> additionalDependencies = new ArrayList();
        protected boolean readOnly = false;

        public Builder(Partition partition, ServiceName<Partition> serviceName) {
            this.logStream = partition.getLogStream();
            this.snapshotStorage = partition.getSnapshotStorage();
            this.additionalDependencies.add(serviceName);
        }

        public Builder processorId(int i) {
            this.processorId = i;
            return this;
        }

        public Builder processorName(String str) {
            this.processorName = str;
            return this;
        }

        public Builder processor(StreamProcessor streamProcessor) {
            this.streamProcessor = streamProcessor;
            return this;
        }

        public Builder processor(TypedStreamProcessor typedStreamProcessor) {
            this.streamProcessor = typedStreamProcessor;
            this.customEventFilter = typedStreamProcessor.buildTypeFilter();
            return this;
        }

        public Builder eventFilter(MetadataFilter metadataFilter) {
            this.customEventFilter = metadataFilter;
            return this;
        }

        public Builder snapshotController(SnapshotController snapshotController) {
            this.snapshotController = snapshotController;
            return this;
        }

        public Builder readOnly(boolean z) {
            this.readOnly = z;
            return this;
        }

        public Builder additionalDependencies(ServiceName<?>... serviceNameArr) {
            for (ServiceName<?> serviceName : serviceNameArr) {
                this.additionalDependencies.add(serviceName);
            }
            return this;
        }

        public ActorFuture<StreamProcessorService> build() {
            EnsureUtil.ensureNotNull("stream processor", this.streamProcessor);
            EnsureUtil.ensureNotNullOrEmpty("processor name", this.processorName);
            EnsureUtil.ensureGreaterThan("process id", this.processorId, -1L);
            VersionFilter versionFilter = new VersionFilter();
            if (this.customEventFilter != null) {
                versionFilter = versionFilter.and(this.customEventFilter);
            }
            return LogStreams.createStreamProcessor(this.processorName, this.processorId, this.streamProcessor).actorScheduler(StreamProcessorServiceFactory.this.actorScheduler).serviceContainer(StreamProcessorServiceFactory.this.serviceContainer).snapshotController(this.snapshotController).snapshotStorage(this.snapshotStorage).snapshotPeriod(StreamProcessorServiceFactory.this.snapshotPeriod).logStream(this.logStream).eventFilter(new MetadataEventFilter(versionFilter)).readOnly(this.readOnly).additionalDependencies(this.additionalDependencies).build();
        }
    }

    /* loaded from: input_file:io/zeebe/broker/logstreams/processor/StreamProcessorServiceFactory$MetadataEventFilter.class */
    private static class MetadataEventFilter implements EventFilter {
        protected final RecordMetadata metadata = new RecordMetadata();
        protected final MetadataFilter metadataFilter;

        MetadataEventFilter(MetadataFilter metadataFilter) {
            this.metadataFilter = metadataFilter;
        }

        public boolean applies(LoggedEvent loggedEvent) {
            loggedEvent.readMetadata(this.metadata);
            return this.metadataFilter.applies(this.metadata);
        }
    }

    /* loaded from: input_file:io/zeebe/broker/logstreams/processor/StreamProcessorServiceFactory$VersionFilter.class */
    private final class VersionFilter implements MetadataFilter {
        private VersionFilter() {
        }

        @Override // io.zeebe.broker.logstreams.processor.MetadataFilter
        public boolean applies(RecordMetadata recordMetadata) {
            if (recordMetadata.getProtocolVersion() > 1) {
                throw new RuntimeException(String.format("Cannot handle event with version newer than what is implemented by broker (%d > %d)", Integer.valueOf(recordMetadata.getProtocolVersion()), 1));
            }
            return true;
        }
    }

    public StreamProcessorServiceFactory(ServiceContainer serviceContainer, Duration duration) {
        this.serviceContainer = serviceContainer;
        this.snapshotPeriod = duration;
    }

    public void start(ServiceStartContext serviceStartContext) {
        this.actorScheduler = serviceStartContext.getScheduler();
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public StreamProcessorServiceFactory m57get() {
        return this;
    }

    public Builder createService(Partition partition, ServiceName<Partition> serviceName) {
        return new Builder(partition, serviceName);
    }
}
